Add AMQP prefix recognition to CloudEventMessageUtils
This commit is contained in:
@@ -142,8 +142,8 @@ public class CloudEventAttributes extends HashMap<String, Object> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <A> A getAtttribute(String attrName) {
|
||||
if (this.containsKey(CloudEventMessageUtils.ATTR_PREFIX + attrName)) {
|
||||
return (A) this.get(CloudEventMessageUtils.ATTR_PREFIX + attrName);
|
||||
if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName)) {
|
||||
return (A) this.get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName);
|
||||
}
|
||||
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName)) {
|
||||
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName);
|
||||
@@ -165,8 +165,8 @@ public class CloudEventAttributes extends HashMap<String, Object> {
|
||||
}
|
||||
|
||||
String getAttributeName(String attributeName) {
|
||||
if (this.containsKey(CloudEventMessageUtils.ATTR_PREFIX + attributeName)) {
|
||||
return CloudEventMessageUtils.ATTR_PREFIX + attributeName;
|
||||
if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName)) {
|
||||
return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName;
|
||||
}
|
||||
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName)) {
|
||||
return CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName;
|
||||
|
||||
@@ -61,15 +61,20 @@ public final class CloudEventMessageUtils {
|
||||
public static MimeType APPLICATION_CLOUDEVENTS = MimeTypeUtils.parseMimeType(APPLICATION_CLOUDEVENTS_VALUE);
|
||||
|
||||
/**
|
||||
* Prefix for attributes.
|
||||
* Default attributes prefix which also suits Kafka.
|
||||
*/
|
||||
public static String ATTR_PREFIX = "ce_";
|
||||
public static String DEFAULT_ATTR_PREFIX = "ce_";
|
||||
|
||||
/**
|
||||
* Prefix for attributes.
|
||||
* HTTP attributes prefix.
|
||||
*/
|
||||
public static String HTTP_ATTR_PREFIX = "ce-";
|
||||
|
||||
/**
|
||||
* AMQP attributes prefix.
|
||||
*/
|
||||
public static String AMQP_ATTR_PREFIX = "cloudEvents:";
|
||||
|
||||
/**
|
||||
* Value for 'data' attribute.
|
||||
*/
|
||||
@@ -78,7 +83,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'data' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_DATA = ATTR_PREFIX + DATA;
|
||||
public static String CANONICAL_DATA = DEFAULT_ATTR_PREFIX + DATA;
|
||||
|
||||
/**
|
||||
* Value for 'id' attribute.
|
||||
@@ -88,7 +93,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'id' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_ID = ATTR_PREFIX + ID;
|
||||
public static String CANONICAL_ID = DEFAULT_ATTR_PREFIX + ID;
|
||||
|
||||
/**
|
||||
* Value for 'source' attribute.
|
||||
@@ -98,7 +103,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'source' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_SOURCE = ATTR_PREFIX + SOURCE;
|
||||
public static String CANONICAL_SOURCE = DEFAULT_ATTR_PREFIX + SOURCE;
|
||||
|
||||
/**
|
||||
* Value for 'specversion' attribute.
|
||||
@@ -108,7 +113,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'specversion' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_SPECVERSION = ATTR_PREFIX + SPECVERSION;
|
||||
public static String CANONICAL_SPECVERSION = DEFAULT_ATTR_PREFIX + SPECVERSION;
|
||||
|
||||
/**
|
||||
* Value for 'type' attribute.
|
||||
@@ -118,7 +123,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'type' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_TYPE = ATTR_PREFIX + TYPE;
|
||||
public static String CANONICAL_TYPE = DEFAULT_ATTR_PREFIX + TYPE;
|
||||
|
||||
/**
|
||||
* Value for 'datacontenttype' attribute.
|
||||
@@ -128,7 +133,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'datacontenttype' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE;
|
||||
public static String CANONICAL_DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + DATACONTENTTYPE;
|
||||
|
||||
/**
|
||||
* Value for 'dataschema' attribute.
|
||||
@@ -138,7 +143,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'dataschema' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_DATASCHEMA = ATTR_PREFIX + DATASCHEMA;
|
||||
public static String CANONICAL_DATASCHEMA = DEFAULT_ATTR_PREFIX + DATASCHEMA;
|
||||
|
||||
/**
|
||||
* Value for 'subject' attribute.
|
||||
@@ -148,7 +153,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'subject' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_SUBJECT = ATTR_PREFIX + SUBJECT;
|
||||
public static String CANONICAL_SUBJECT = DEFAULT_ATTR_PREFIX + SUBJECT;
|
||||
|
||||
/**
|
||||
* Value for 'time' attribute.
|
||||
@@ -158,7 +163,7 @@ public final class CloudEventMessageUtils {
|
||||
/**
|
||||
* Value for 'time' attribute with prefix.
|
||||
*/
|
||||
public static String CANONICAL_TIME = ATTR_PREFIX + TIME;
|
||||
public static String CANONICAL_TIME = DEFAULT_ATTR_PREFIX + TIME;
|
||||
|
||||
/**
|
||||
* Checks if {@link Message} represents cloud event in binary-mode.
|
||||
@@ -262,8 +267,11 @@ public final class CloudEventMessageUtils {
|
||||
if (keys.contains("user-agent")) {
|
||||
return CloudEventMessageUtils.HTTP_ATTR_PREFIX;
|
||||
}
|
||||
else if (keys.contains("amqp")) {
|
||||
return CloudEventMessageUtils.AMQP_ATTR_PREFIX;
|
||||
}
|
||||
else {
|
||||
return CloudEventMessageUtils.ATTR_PREFIX;
|
||||
return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; // default which also suits Kafka 'ce_'
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user