diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java index 3d1883f04..2592717cd 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java @@ -142,8 +142,8 @@ public class CloudEventAttributes extends HashMap { */ @SuppressWarnings("unchecked") public A getAtttribute(String attrName) { - if (this.containsKey(CloudEventMessageUtils.ATTR_PREFIX + attrName)) { - return (A) this.get(CloudEventMessageUtils.ATTR_PREFIX + attrName); + if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName)) { + return (A) this.get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attrName); } else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName)) { return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attrName); @@ -165,8 +165,8 @@ public class CloudEventAttributes extends HashMap { } String getAttributeName(String attributeName) { - if (this.containsKey(CloudEventMessageUtils.ATTR_PREFIX + attributeName)) { - return CloudEventMessageUtils.ATTR_PREFIX + attributeName; + if (this.containsKey(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName)) { + return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + attributeName; } else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName)) { return CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index cd2cda4f0..08901e2d1 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -61,15 +61,20 @@ public final class CloudEventMessageUtils { public static MimeType APPLICATION_CLOUDEVENTS = MimeTypeUtils.parseMimeType(APPLICATION_CLOUDEVENTS_VALUE); /** - * Prefix for attributes. + * Default attributes prefix which also suits Kafka. */ - public static String ATTR_PREFIX = "ce_"; + public static String DEFAULT_ATTR_PREFIX = "ce_"; /** - * Prefix for attributes. + * HTTP attributes prefix. */ public static String HTTP_ATTR_PREFIX = "ce-"; + /** + * AMQP attributes prefix. + */ + public static String AMQP_ATTR_PREFIX = "cloudEvents:"; + /** * Value for 'data' attribute. */ @@ -78,7 +83,7 @@ public final class CloudEventMessageUtils { /** * Value for 'data' attribute with prefix. */ - public static String CANONICAL_DATA = ATTR_PREFIX + DATA; + public static String CANONICAL_DATA = DEFAULT_ATTR_PREFIX + DATA; /** * Value for 'id' attribute. @@ -88,7 +93,7 @@ public final class CloudEventMessageUtils { /** * Value for 'id' attribute with prefix. */ - public static String CANONICAL_ID = ATTR_PREFIX + ID; + public static String CANONICAL_ID = DEFAULT_ATTR_PREFIX + ID; /** * Value for 'source' attribute. @@ -98,7 +103,7 @@ public final class CloudEventMessageUtils { /** * Value for 'source' attribute with prefix. */ - public static String CANONICAL_SOURCE = ATTR_PREFIX + SOURCE; + public static String CANONICAL_SOURCE = DEFAULT_ATTR_PREFIX + SOURCE; /** * Value for 'specversion' attribute. @@ -108,7 +113,7 @@ public final class CloudEventMessageUtils { /** * Value for 'specversion' attribute with prefix. */ - public static String CANONICAL_SPECVERSION = ATTR_PREFIX + SPECVERSION; + public static String CANONICAL_SPECVERSION = DEFAULT_ATTR_PREFIX + SPECVERSION; /** * Value for 'type' attribute. @@ -118,7 +123,7 @@ public final class CloudEventMessageUtils { /** * Value for 'type' attribute with prefix. */ - public static String CANONICAL_TYPE = ATTR_PREFIX + TYPE; + public static String CANONICAL_TYPE = DEFAULT_ATTR_PREFIX + TYPE; /** * Value for 'datacontenttype' attribute. @@ -128,7 +133,7 @@ public final class CloudEventMessageUtils { /** * Value for 'datacontenttype' attribute with prefix. */ - public static String CANONICAL_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE; + public static String CANONICAL_DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + DATACONTENTTYPE; /** * Value for 'dataschema' attribute. @@ -138,7 +143,7 @@ public final class CloudEventMessageUtils { /** * Value for 'dataschema' attribute with prefix. */ - public static String CANONICAL_DATASCHEMA = ATTR_PREFIX + DATASCHEMA; + public static String CANONICAL_DATASCHEMA = DEFAULT_ATTR_PREFIX + DATASCHEMA; /** * Value for 'subject' attribute. @@ -148,7 +153,7 @@ public final class CloudEventMessageUtils { /** * Value for 'subject' attribute with prefix. */ - public static String CANONICAL_SUBJECT = ATTR_PREFIX + SUBJECT; + public static String CANONICAL_SUBJECT = DEFAULT_ATTR_PREFIX + SUBJECT; /** * Value for 'time' attribute. @@ -158,7 +163,7 @@ public final class CloudEventMessageUtils { /** * Value for 'time' attribute with prefix. */ - public static String CANONICAL_TIME = ATTR_PREFIX + TIME; + public static String CANONICAL_TIME = DEFAULT_ATTR_PREFIX + TIME; /** * Checks if {@link Message} represents cloud event in binary-mode. @@ -262,8 +267,11 @@ public final class CloudEventMessageUtils { if (keys.contains("user-agent")) { return CloudEventMessageUtils.HTTP_ATTR_PREFIX; } + else if (keys.contains("amqp")) { + return CloudEventMessageUtils.AMQP_ATTR_PREFIX; + } else { - return CloudEventMessageUtils.ATTR_PREFIX; + return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; // default which also suits Kafka 'ce_' } } diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index 76c92c2da..b6f002021 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -259,10 +259,10 @@ public class CloudeventDemoApplicationRESTTests { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); - headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString()); - headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/"); - headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0"); - headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework"); + headers.set(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString()); + headers.set(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/"); + headers.set(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0"); + headers.set(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework"); String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo")); @@ -273,10 +273,10 @@ public class CloudeventDemoApplicationRESTTests { .isEqualTo(Collections.singletonList("https://interface21.com/")); assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList("com.interface21")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.TYPE)).isNull(); - assertThat(response.getHeaders().get(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SOURCE)).isNull(); - assertThat(response.getHeaders().get(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.ID)).isNull(); - assertThat(response.getHeaders().get(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION)).isNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.TYPE)).isNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)).isNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION)).isNull(); }