diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java index 485fef3a1..3e5b3fa8f 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventHeaderEnricher.java @@ -16,8 +16,6 @@ package org.springframework.cloud.function.cloudevent; - - /** * Strategy that should be implemented by the user to help with outgoing Cloud Event * headers.
@@ -32,15 +30,16 @@ package org.springframework.cloud.function.cloudevent; * *
  * @Bean
- * public CloudEventHeadersProvider cloudEventHeadersProvider() {
- * 	return attributes ->
- *		CloudEventHeaderUtils.fromAttributes(attributes).withSource("https://interface21.com/").withType("com.interface21").build();
+ * public CloudEventHeaderEnricher cloudEventHeaderEnricher() {
+ *  return headers -> {
+ *   return headers.setSource("https://interface21.com/").setType("com.interface21");
+ *  };
  * }
  * 
* * @author Oleg Zhurakousky * @author Dave Syer - * @since 2.0 + * @since 3.1 */ @FunctionalInterface public interface CloudEventHeaderEnricher { @@ -49,6 +48,12 @@ public interface CloudEventHeaderEnricher { * @param attributes instance of {@link CloudEventContext} * @return modified {@link CloudEventContext} */ + /** + * Will provide access to an open instance of {@link CloudEventMessageBuilder} so you + * can add additional attributes and headers. + * @param messageBuilder open instance of {@link CloudEventMessageBuilder} + * @return instance of {@link CloudEventMessageBuilder} + */ CloudEventMessageBuilder enrich(CloudEventMessageBuilder messageBuilder); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index a7a245389..75e14ce93 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -36,7 +36,7 @@ import org.springframework.util.StringUtils; /** * Message builder which is aware of Cloud Event semantics. * It provides type-safe setters for v1.0 Cloud Event attributes while - * supporting any version by exposing a convenient + * supporting all other versions via convenient * {@link #setHeader(String, Object)} method. * * @author Oleg Zhurakousky @@ -141,28 +141,42 @@ public final class CloudEventMessageBuilder { return Collections.unmodifiableMap(this.headers); } + /** + * Will build the message ensuring that the Cloud Event attributes are all + * prefixed with the prefix determined by the framework. If you want to + * use a specific prefix please use {@link #build(String)} method. + * @return instance of {@link Message} + */ public Message build() { return this.doBuild(CloudEventMessageUtils.determinePrefixToUse(this.headers)); } + /** + * Will build the message ensuring that the Cloud Event attributes are + * prefixed with the 'attributePrefixToUse'. + * + * @param attributePrefixToUse prefix to use for attributes + * @return instance of {@link Message} + */ public Message build(String attributePrefixToUse) { + Assert.isTrue(attributePrefixToUse.equals(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX) + || attributePrefixToUse.equals(CloudEventMessageUtils.KAFKA_ATTR_PREFIX) + || attributePrefixToUse.equals(CloudEventMessageUtils.AMQP_ATTR_PREFIX), "Supported prefixes are " + + CloudEventMessageUtils.DEFAULT_ATTR_PREFIX + + ", " + CloudEventMessageUtils.KAFKA_ATTR_PREFIX + + ", " + CloudEventMessageUtils.AMQP_ATTR_PREFIX + + ". Was " + attributePrefixToUse); if (StringUtils.hasText(attributePrefixToUse)) { String[] keys = this.headers.keySet().toArray(new String[] {}); for (String key : keys) { if (key.startsWith(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX)) { - Object value = headers.remove(key); - key = key.substring(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX.length()); - this.headers.put(attributePrefixToUse + key, value); + this.swapPrefix(key, CloudEventMessageUtils.DEFAULT_ATTR_PREFIX, attributePrefixToUse); } else if (key.startsWith(CloudEventMessageUtils.AMQP_ATTR_PREFIX)) { - Object value = headers.remove(key); - key = key.substring(CloudEventMessageUtils.AMQP_ATTR_PREFIX.length()); - this.headers.put(attributePrefixToUse + key, value); + this.swapPrefix(key, CloudEventMessageUtils.AMQP_ATTR_PREFIX, attributePrefixToUse); } else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) { - Object value = headers.remove(key); - key = key.substring(CloudEventMessageUtils.KAFKA_ATTR_PREFIX.length()); - this.headers.put(attributePrefixToUse + key, value); + this.swapPrefix(key, CloudEventMessageUtils.KAFKA_ATTR_PREFIX, attributePrefixToUse); } } } @@ -173,6 +187,12 @@ public final class CloudEventMessageBuilder { return doBuild(prefix); } + private void swapPrefix(String key, String currentPrefix, String newPrefix) { + Object value = headers.remove(key); + key = key.substring(currentPrefix.length()); + this.headers.put(newPrefix + key, value); + } + private Message doBuild(String prefix) { if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SPECVERSION)) { this.headers.put(prefix + CloudEventMessageUtils._SPECVERSION, "1.0");