diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index 75552af59..dc85f16f5 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.StringUtils; /** * Message builder which is aware of Cloud Event semantics. @@ -145,16 +146,33 @@ public final class CloudEventMessageBuilder { return this.doBuild(); } + public Message build(String attributePrefixToUse) { - String[] keys = this.headers.keySet().toArray(new String[] {}); - for (String key : keys) { - Object value = this.headers.remove(key); - this.headers.put(attributePrefixToUse + key, value); + if (StringUtils.hasText(attributePrefixToUse)) { + String[] keys = this.headers.keySet().toArray(new String[] {}); + for (String key : keys) { + if (key.startsWith(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX.length()); + headers.put(attributePrefixToUse + key, value); + } + else if (key.startsWith(CloudEventMessageUtils.AMQP_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(CloudEventMessageUtils.AMQP_ATTR_PREFIX.length()); + headers.put(attributePrefixToUse + key, value); + } + else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(CloudEventMessageUtils.KAFKA_ATTR_PREFIX.length()); + headers.put(attributePrefixToUse + key, value); + } + } } + if (!this.headers.containsKey(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION)) { this.headers.put(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION, "1.0"); } - return build(); + return doBuild(); } private Message doBuild() { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index e45cb1f85..72ca8fc12 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -21,7 +21,6 @@ import java.net.URI; import java.time.OffsetTime; import java.util.Collections; import java.util.Map; -import java.util.Set; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -48,8 +47,13 @@ public final class CloudEventMessageUtils { private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver(); - private CloudEventMessageUtils() { + private static Field MESSAGE_HEADERS = ReflectionUtils.findField(MessageHeaders.class, "headers"); + static { + MESSAGE_HEADERS.setAccessible(true); + } + + private CloudEventMessageUtils() { } /** @@ -65,7 +69,7 @@ public final class CloudEventMessageUtils { /** * Prefix for attributes. */ - public static String DEFAULT_ATTR_PREFIX = "ce_"; + public static String DEFAULT_ATTR_PREFIX = "ce-"; /** * AMQP attributes prefix. @@ -75,57 +79,57 @@ public final class CloudEventMessageUtils { /** * Prefix for attributes. */ - public static String HTTP_ATTR_PREFIX = "ce-"; + public static String KAFKA_ATTR_PREFIX = "ce_"; /** * Value for 'data' attribute. */ - public static String DATA = "data"; + public static String DATA = DEFAULT_ATTR_PREFIX + "data"; /** * Value for 'id' attribute. */ - public static String ID = "id"; + public static String ID = DEFAULT_ATTR_PREFIX + "id"; /** * Value for 'source' attribute. */ - public static String SOURCE = "source"; + public static String SOURCE = DEFAULT_ATTR_PREFIX + "source"; /** * Value for 'specversion' attribute. */ - public static String SPECVERSION = "specversion"; + public static String SPECVERSION = DEFAULT_ATTR_PREFIX + "specversion"; /** * Value for 'type' attribute. */ - public static String TYPE = "type"; + public static String TYPE = DEFAULT_ATTR_PREFIX + "type"; /** * Value for 'datacontenttype' attribute. */ - public static String DATACONTENTTYPE = "datacontenttype"; + public static String DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + "datacontenttype"; /** * Value for 'dataschema' attribute. */ - public static String DATASCHEMA = "dataschema"; + public static String DATASCHEMA = DEFAULT_ATTR_PREFIX + "dataschema"; /** * V03 name for 'dataschema' attribute. */ - public static final String SCHEMAURL = "schemaurl"; + public static final String SCHEMAURL = DEFAULT_ATTR_PREFIX + "schemaurl"; /** * Value for 'subject' attribute. */ - public static String SUBJECT = "subject"; + public static String SUBJECT = DEFAULT_ATTR_PREFIX + "subject"; /** * Value for 'time' attribute. */ - public static String TIME = "time"; + public static String TIME = DEFAULT_ATTR_PREFIX + "time"; public static String getId(Message message) { if (message.getHeaders().containsKey("_id")) { @@ -171,16 +175,13 @@ public final class CloudEventMessageUtils { } @SuppressWarnings("unchecked") - protected static Message toCannonical(Message inputMessage, MessageConverter messageConverter) { - - Field headersField = ReflectionUtils.findField(MessageHeaders.class, "headers"); - headersField.setAccessible(true); - Map headers = (Map) ReflectionUtils.getField(headersField, inputMessage.getHeaders()); - canonicalizeHeaders(headers); + protected static Message toCanonical(Message inputMessage, MessageConverter messageConverter) { + Map headers = (Map) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders()); + canonicalizeHeaders(headers, false); String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE); // first check the obvious and see if content-type is `cloudevents` - if (!isBinary(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { + if (!isCloudEvent(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders()); if (contentType.getType().equals(APPLICATION_CLOUDEVENTS.getType()) && contentType .getSubtype().startsWith(APPLICATION_CLOUDEVENTS.getSubtype())) { @@ -197,7 +198,7 @@ public final class CloudEventMessageUtils { Map structuredCloudEvent = (Map) messageConverter .fromMessage(cloudEventMessage, Map.class); - canonicalizeHeaders(structuredCloudEvent); + canonicalizeHeaders(structuredCloudEvent, true); Message binaryCeMessage = buildBinaryMessageFromStructuredMap(structuredCloudEvent, inputMessage.getHeaders()); @@ -221,30 +222,14 @@ public final class CloudEventMessageUtils { * @return prefix (e.g., 'ce_' or 'ce-' etc.) */ protected static String determinePrefixToUse(Map messageHeaders) { - Set keys = messageHeaders.keySet(); - if (keys.contains("user-agent")) { - return HTTP_ATTR_PREFIX; - } - else { - for (String key : messageHeaders.keySet()) { - if (key.startsWith("kafka_")) { - return DEFAULT_ATTR_PREFIX; - } - else if (key.startsWith("amqp_")) { - return AMQP_ATTR_PREFIX; - } - else if (key.startsWith(DEFAULT_ATTR_PREFIX)) { - return DEFAULT_ATTR_PREFIX; - } - else if (key.startsWith(HTTP_ATTR_PREFIX)) { - return HTTP_ATTR_PREFIX; - } - else if (key.startsWith(AMQP_ATTR_PREFIX)) { - return AMQP_ATTR_PREFIX; - } + for (String key : messageHeaders.keySet()) { + if (key.startsWith(KAFKA_ATTR_PREFIX)) { + return KAFKA_ATTR_PREFIX; + } + else if (key.startsWith(AMQP_ATTR_PREFIX)) { + return AMQP_ATTR_PREFIX; } } - return ""; } @@ -254,7 +239,7 @@ public final class CloudEventMessageUtils { * @param message input {@link Message} * @return true if this Message represents Cloud Event in binary-mode */ - protected static boolean isBinary(Message message) { + protected static boolean isCloudEvent(Message message) { return message.getHeaders().containsKey(SPECVERSION) && message.getHeaders().containsKey(TYPE) && message.getHeaders().containsKey(SOURCE); @@ -265,23 +250,27 @@ public final class CloudEventMessageUtils { * So, for example 'ce_source' will become 'source'. * @param headers message headers */ - private static void canonicalizeHeaders(Map headers) { + private static void canonicalizeHeaders(Map headers, boolean structured) { String[] keys = headers.keySet().toArray(new String[] {}); for (String key : keys) { - if (key.startsWith(HTTP_ATTR_PREFIX)) { - Object value = headers.remove(key); - key = key.substring(HTTP_ATTR_PREFIX.length()); - headers.put(key, value); - } - else if (key.startsWith(DEFAULT_ATTR_PREFIX)) { + if (key.startsWith(DEFAULT_ATTR_PREFIX)) { Object value = headers.remove(key); key = key.substring(DEFAULT_ATTR_PREFIX.length()); - headers.put(key, value); + headers.put(DEFAULT_ATTR_PREFIX + key, value); + } + else if (key.startsWith(KAFKA_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(KAFKA_ATTR_PREFIX.length()); + headers.put(DEFAULT_ATTR_PREFIX + key, value); } else if (key.startsWith(AMQP_ATTR_PREFIX)) { Object value = headers.remove(key); key = key.substring(AMQP_ATTR_PREFIX.length()); - headers.put(key, value); + headers.put(DEFAULT_ATTR_PREFIX + key, value); + } + else if (structured) { + Object value = headers.remove(key); + headers.put(DEFAULT_ATTR_PREFIX + key, value); } } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java index e651fe509..2f8e701af 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -57,13 +57,13 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper preProcessInput(Message input, Object inputConverter) { - return CloudEventMessageUtils.toCannonical(input, (MessageConverter) inputConverter); + return CloudEventMessageUtils.toCanonical(input, (MessageConverter) inputConverter); } @Override public Message postProcessResult(Message input, Object result) { Message resultMessage = null; - if (CloudEventMessageUtils.isBinary(input)) { + if (CloudEventMessageUtils.isCloudEvent(input)) { CloudEventMessageBuilder messageBuilder = CloudEventMessageBuilder .withData(result) .setId(UUID.randomUUID().toString()) @@ -75,6 +75,7 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper resultMessage = (Message) function.apply(inputMessage); @@ -66,7 +67,7 @@ public class CloudEventFunctionTests { * both on input and output that it is dealing with Cloud Event and generates * appropriate headers/attributes */ - assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue(); assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName()); assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } @@ -95,7 +96,7 @@ public class CloudEventFunctionTests { * both on input and output that it is dealing with Cloud Event and generates * appropriate headers/attributes */ - assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue(); assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName()); assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } @@ -117,12 +118,12 @@ public class CloudEventFunctionTests { "}"; Function function = this.lookup("springRelease", TestConfiguration.class); - Message inputMessage = CloudEventMessageBuilder - .withData(payload) + Message inputMessage = MessageBuilder + .withPayload(payload) .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") .build(); - assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse(); + assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse(); Message resultMessage = (Message) function.apply(inputMessage); assertThat(resultMessage.getPayload().getReleaseDate()) @@ -133,7 +134,7 @@ public class CloudEventFunctionTests { // * both on input and output that it is dealing with Cloud Event and generates // * appropriate headers/attributes // */ - assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue(); assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName()); assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } @@ -158,7 +159,7 @@ public class CloudEventFunctionTests { .withData(payload) .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") .build(); - assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse(); + assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse(); Message resultMessage = (Message) function.apply(inputMessage); assertThat(resultMessage.getPayload().getReleaseDate()) @@ -169,7 +170,7 @@ public class CloudEventFunctionTests { * both on input and output that it is dealing with Cloud Event and generates * appropriate headers/attributes */ - assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue(); assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName()); assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index ef44106a1..995b8c48c 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -131,7 +131,7 @@ public class CloudeventDemoApplication { @Bean public Consumer> pojoConsumer(CloudEventHeaderEnricher enricher, RestTemplateBuilder builder) { return eventMessage -> { - Message newMessage = enricher.enrich(CloudEventMessageBuilder.fromMessage(eventMessage)).build(CloudEventMessageUtils.HTTP_ATTR_PREFIX); + Message newMessage = enricher.enrich(CloudEventMessageBuilder.fromMessage(eventMessage)).build(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX); RequestEntity entity = RequestEntity.post(URI.create("http://foo.com")) .headers(HeaderUtils.fromMessage(newMessage.getHeaders())) .body(eventMessage.getPayload()); diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index b6f002021..db6aeb308 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -130,7 +130,7 @@ public class CloudeventDemoApplicationRESTTests { SpringApplication.run(new Class[] {CloudeventDemoApplication.class, FooBarConverterConfiguration.class}, new String[] {}); HttpHeaders headers = this.buildHeaders(MediaType.valueOf("application/cloudevents+json;charset=utf-8")); - headers.set("datacontenttype", "foo/bar"); + headers.set(CloudEventMessageUtils.DATACONTENTTYPE, "foo/bar"); String payload = "24-03-2004:Spring Framework:1.0"; RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asPOJOMessage")); @@ -171,11 +171,11 @@ public class CloudeventDemoApplicationRESTTests { response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList("com.interface21")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull(); } @Test @@ -207,11 +207,11 @@ public class CloudeventDemoApplicationRESTTests { response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"version\":\"1.0\",\"releaseName\":\"Spring Framework\",\"releaseDate\":\"24-03-2004\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList("com.interface21")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull(); } @Test @@ -225,9 +225,9 @@ public class CloudeventDemoApplicationRESTTests { ResponseEntity response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2050\",\"releaseName\":\"Spring Framework\",\"version\":\"10.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList("com.interface21")); } @@ -242,9 +242,9 @@ public class CloudeventDemoApplicationRESTTests { ResponseEntity response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList("com.interface21")); } @@ -259,24 +259,20 @@ public class CloudeventDemoApplicationRESTTests { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); - headers.set(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString()); - headers.set(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/"); - headers.set(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0"); - headers.set(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework"); + headers.set(CloudEventMessageUtils.ID, UUID.randomUUID().toString()); + headers.set(CloudEventMessageUtils.SOURCE, "https://spring.io/"); + headers.set(CloudEventMessageUtils.SPECVERSION, "1.0"); + headers.set(CloudEventMessageUtils.TYPE, "org.springframework"); String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo")); ResponseEntity response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList("com.interface21")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.TYPE)).isNull(); - assertThat(response.getHeaders().get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)).isNull(); - assertThat(response.getHeaders().get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNull(); - assertThat(response.getHeaders().get(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION)).isNull(); } @@ -317,11 +313,11 @@ public class CloudeventDemoApplicationRESTTests { assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework"); assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("https://interface21.com/")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList("com.interface21")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)).isNotNull(); + assertThat(response.getHeaders().get(CloudEventMessageUtils.ID)).isNotNull(); } @Test @@ -345,10 +341,10 @@ public class CloudeventDemoApplicationRESTTests { private HttpHeaders buildHeaders(MediaType contentType) { HttpHeaders headers = new HttpHeaders(); headers.setContentType(contentType); - headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString()); - headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/"); - headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0"); - headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework"); + headers.set(CloudEventMessageUtils.ID, UUID.randomUUID().toString()); + headers.set(CloudEventMessageUtils.SOURCE, "https://spring.io/"); + headers.set(CloudEventMessageUtils.SPECVERSION, "1.0"); + headers.set(CloudEventMessageUtils.TYPE, "org.springframework"); return headers; } @@ -383,7 +379,8 @@ public class CloudeventDemoApplicationRESTTests { if (targetClass == null || !supportsMimeType(message.getHeaders())) { return false; } - else if (message.getHeaders().containsKey("datacontenttype") && message.getHeaders().get("datacontenttype").equals("foo/bar")) { + else if (message.getHeaders().containsKey(CloudEventMessageUtils.DATACONTENTTYPE) + && message.getHeaders().get(CloudEventMessageUtils.DATACONTENTTYPE).equals("foo/bar")) { return true; } return false; @@ -391,8 +388,8 @@ public class CloudeventDemoApplicationRESTTests { @Override protected Object convertFromInternal(Message message, Class targetClass, @Nullable Object conversionHint) { - if (message.getHeaders().containsKey("datacontenttype") - && message.getHeaders().get("datacontenttype").equals("foo/bar") + if (message.getHeaders().containsKey(CloudEventMessageUtils.DATACONTENTTYPE) + && message.getHeaders().get(CloudEventMessageUtils.DATACONTENTTYPE).equals("foo/bar") && SpringReleaseEvent.class == targetClass) { SpringReleaseEvent event = new SpringReleaseEvent(); String[] data = ((String) message.getPayload()).split(":");