From 4c69ca1cd607a327f3e103fc6b852a507ab5d8a5 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 2 Dec 2020 12:45:45 +0100 Subject: [PATCH] interim --- .../cloudevent/CloudEventMessageBuilder.java | 9 +- .../cloudevent/CloudEventMessageUtils.java | 84 ++++++++++++++----- .../CloudEventsFunctionInvocationHelper.java | 2 + 3 files changed, 70 insertions(+), 25 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index dc85f16f5..50d58f5b4 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -58,9 +58,9 @@ public final class CloudEventMessageBuilder { } @SuppressWarnings("unchecked") - public static CloudEventMessageBuilder fromMessage(Message message) { + public static CloudEventMessageBuilder fromMessage(Message message) { CloudEventMessageBuilder builder = new CloudEventMessageBuilder(new HashMap<>(message.getHeaders())); - builder.data = (T) message.getPayload(); + builder.data = message.getPayload(); return builder; } @@ -169,8 +169,9 @@ public final class CloudEventMessageBuilder { } } - if (!this.headers.containsKey(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION)) { - this.headers.put(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION, "1.0"); + if (!this.headers.containsKey(attributePrefixToUse + "specversion")) { + String prefix = StringUtils.hasText(attributePrefixToUse) ? attributePrefixToUse : CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; + this.headers.put(prefix + CloudEventMessageUtils._SPECVERSION, "1.0"); } return doBuild(); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 72ca8fc12..3802ad378 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -21,6 +21,7 @@ import java.net.URI; import java.time.OffsetTime; import java.util.Collections; import java.util.Map; +import java.util.stream.Collectors; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -56,6 +57,28 @@ public final class CloudEventMessageUtils { private CloudEventMessageUtils() { } + //===== + + static String _DATA = "data"; + + static String _ID = "id"; + + static String _SOURCE = "source"; + + static String _SPECVERSION = "specversion"; + + static String _TYPE = "type"; + + static String _DATACONTENTTYPE = "datacontenttype"; + + static String _DATASCHEMA = "dataschema"; + + static String _SCHEMAURL = "schemaurl"; + + static String _SUBJECT = "subject"; + + static String _TIME = "time"; + /** * String value of 'application/cloudevents' mime type. */ @@ -84,52 +107,53 @@ public final class CloudEventMessageUtils { /** * Value for 'data' attribute. */ - public static String DATA = DEFAULT_ATTR_PREFIX + "data"; + public static String DATA = DEFAULT_ATTR_PREFIX + _DATA; /** * Value for 'id' attribute. */ - public static String ID = DEFAULT_ATTR_PREFIX + "id"; + public static String ID = DEFAULT_ATTR_PREFIX + _ID; /** * Value for 'source' attribute. */ - public static String SOURCE = DEFAULT_ATTR_PREFIX + "source"; + public static String SOURCE = DEFAULT_ATTR_PREFIX + _SOURCE; /** * Value for 'specversion' attribute. */ - public static String SPECVERSION = DEFAULT_ATTR_PREFIX + "specversion"; + public static String SPECVERSION = DEFAULT_ATTR_PREFIX + _SPECVERSION; /** * Value for 'type' attribute. */ - public static String TYPE = DEFAULT_ATTR_PREFIX + "type"; + public static String TYPE = DEFAULT_ATTR_PREFIX + _TYPE; /** * Value for 'datacontenttype' attribute. */ - public static String DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + "datacontenttype"; + public static String DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + _DATACONTENTTYPE; /** * Value for 'dataschema' attribute. */ - public static String DATASCHEMA = DEFAULT_ATTR_PREFIX + "dataschema"; + public static String DATASCHEMA = DEFAULT_ATTR_PREFIX + _DATASCHEMA; /** * V03 name for 'dataschema' attribute. */ - public static final String SCHEMAURL = DEFAULT_ATTR_PREFIX + "schemaurl"; + public static final String SCHEMAURL = DEFAULT_ATTR_PREFIX + _SCHEMAURL; /** * Value for 'subject' attribute. */ - public static String SUBJECT = DEFAULT_ATTR_PREFIX + "subject"; + public static String SUBJECT = DEFAULT_ATTR_PREFIX + _SUBJECT; /** * Value for 'time' attribute. */ - public static String TIME = DEFAULT_ATTR_PREFIX + "time"; + public static String TIME = DEFAULT_ATTR_PREFIX + _TIME; + public static String getId(Message message) { if (message.getHeaders().containsKey("_id")) { @@ -141,41 +165,52 @@ public final class CloudEventMessageUtils { public static URI getSource(Message message) { String prefix = determinePrefixToUse(message.getHeaders()); - return safeGetURI(message.getHeaders(), prefix + SOURCE); + return safeGetURI(message.getHeaders(), prefix + _SOURCE); } public static String getSpecVersion(Message message) { String prefix = determinePrefixToUse(message.getHeaders()); - return (String) message.getHeaders().get(prefix + SPECVERSION); + return (String) message.getHeaders().get(prefix + _SPECVERSION); } public static String getType(Message message) { String prefix = determinePrefixToUse(message.getHeaders()); - return (String) message.getHeaders().get(prefix + TYPE); + return (String) message.getHeaders().get(prefix + _TYPE); } public static String getDataContentType(Message message) { String prefix = determinePrefixToUse(message.getHeaders()); - return (String) message.getHeaders().get(prefix + DATACONTENTTYPE); + return (String) message.getHeaders().get(prefix + _DATACONTENTTYPE); } public static URI getDataSchema(Message message) { String prefix = determinePrefixToUse(message.getHeaders()); - return safeGetURI(message.getHeaders(), prefix + DATASCHEMA); + return safeGetURI(message.getHeaders(), prefix + _DATASCHEMA); } public static String getSubject(Message message) { String prefix = determinePrefixToUse(message.getHeaders()); - return (String) message.getHeaders().get(prefix + SUBJECT); + return (String) message.getHeaders().get(prefix + _SUBJECT); } public static OffsetTime getTime(Message message) { String prefix = determinePrefixToUse(message.getHeaders()); - return (OffsetTime) message.getHeaders().get(prefix + TIME); + return (OffsetTime) message.getHeaders().get(prefix + _TIME); } @SuppressWarnings("unchecked") - protected static Message toCanonical(Message inputMessage, MessageConverter messageConverter) { + public static T getData(Message message) { + return (T) message.getPayload(); + } + + public static Map getAttributes(Message message) { + return message.getHeaders().entrySet().stream() + .filter(e -> isAttribute(e.getKey())) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + } + + @SuppressWarnings("unchecked") + static Message toCanonical(Message inputMessage, MessageConverter messageConverter) { Map headers = (Map) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders()); canonicalizeHeaders(headers, false); @@ -221,9 +256,12 @@ public final class CloudEventMessageUtils { * @param messageHeaders map of message headers * @return prefix (e.g., 'ce_' or 'ce-' etc.) */ - protected static String determinePrefixToUse(Map messageHeaders) { + static String determinePrefixToUse(Map messageHeaders) { for (String key : messageHeaders.keySet()) { - if (key.startsWith(KAFKA_ATTR_PREFIX)) { + if (key.startsWith(DEFAULT_ATTR_PREFIX)) { + return DEFAULT_ATTR_PREFIX; + } + else if (key.startsWith(KAFKA_ATTR_PREFIX)) { return KAFKA_ATTR_PREFIX; } else if (key.startsWith(AMQP_ATTR_PREFIX)) { @@ -239,12 +277,16 @@ public final class CloudEventMessageUtils { * @param message input {@link Message} * @return true if this Message represents Cloud Event in binary-mode */ - protected static boolean isCloudEvent(Message message) { + static boolean isCloudEvent(Message message) { return message.getHeaders().containsKey(SPECVERSION) && message.getHeaders().containsKey(TYPE) && message.getHeaders().containsKey(SOURCE); } + private static boolean isAttribute(String key) { + return key.startsWith(DEFAULT_ATTR_PREFIX) || key.startsWith(AMQP_ATTR_PREFIX) || key.startsWith(KAFKA_ATTR_PREFIX); + } + /** * Will canonicalize Cloud Event attributes (headers) by removing well known prefixes. * So, for example 'ce_source' will become 'source'. diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java index 2f8e701af..ce12537e2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -32,6 +32,8 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; /** + * Implementation of {@link FunctionInvocationHelper} to support Cloud Events. + * This is a primary (and the only) integration bridge with {@link FunctionInvocationHelper}. * * @author Oleg Zhurakousky * @since 2.0