Consolidate Cloud Events attribute prefix swap logic
This commit is contained in:
@@ -16,8 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.function.cloudevent;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Strategy that should be implemented by the user to help with outgoing Cloud Event
|
||||
* headers. <br>
|
||||
@@ -32,15 +30,16 @@ package org.springframework.cloud.function.cloudevent;
|
||||
*
|
||||
* <pre>
|
||||
* @Bean
|
||||
* public CloudEventHeadersProvider cloudEventHeadersProvider() {
|
||||
* return attributes ->
|
||||
* CloudEventHeaderUtils.fromAttributes(attributes).withSource("https://interface21.com/").withType("com.interface21").build();
|
||||
* public CloudEventHeaderEnricher cloudEventHeaderEnricher() {
|
||||
* return headers -> {
|
||||
* return headers.setSource("https://interface21.com/").setType("com.interface21");
|
||||
* };
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Dave Syer
|
||||
* @since 2.0
|
||||
* @since 3.1
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface CloudEventHeaderEnricher {
|
||||
@@ -49,6 +48,12 @@ public interface CloudEventHeaderEnricher {
|
||||
* @param attributes instance of {@link CloudEventContext}
|
||||
* @return modified {@link CloudEventContext}
|
||||
*/
|
||||
/**
|
||||
* Will provide access to an open instance of {@link CloudEventMessageBuilder} so you
|
||||
* can add additional attributes and headers.
|
||||
* @param messageBuilder open instance of {@link CloudEventMessageBuilder}
|
||||
* @return instance of {@link CloudEventMessageBuilder}
|
||||
*/
|
||||
CloudEventMessageBuilder<?> enrich(CloudEventMessageBuilder<?> messageBuilder);
|
||||
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ import org.springframework.util.StringUtils;
|
||||
/**
|
||||
* Message builder which is aware of Cloud Event semantics.
|
||||
* It provides type-safe setters for v1.0 Cloud Event attributes while
|
||||
* supporting any version by exposing a convenient
|
||||
* supporting all other versions via convenient
|
||||
* {@link #setHeader(String, Object)} method.
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
@@ -141,28 +141,42 @@ public final class CloudEventMessageBuilder<T> {
|
||||
return Collections.unmodifiableMap(this.headers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Will build the message ensuring that the Cloud Event attributes are all
|
||||
* prefixed with the prefix determined by the framework. If you want to
|
||||
* use a specific prefix please use {@link #build(String)} method.
|
||||
* @return instance of {@link Message}
|
||||
*/
|
||||
public Message<T> build() {
|
||||
return this.doBuild(CloudEventMessageUtils.determinePrefixToUse(this.headers));
|
||||
}
|
||||
|
||||
/**
|
||||
* Will build the message ensuring that the Cloud Event attributes are
|
||||
* prefixed with the 'attributePrefixToUse'.
|
||||
*
|
||||
* @param attributePrefixToUse prefix to use for attributes
|
||||
* @return instance of {@link Message}
|
||||
*/
|
||||
public Message<T> build(String attributePrefixToUse) {
|
||||
Assert.isTrue(attributePrefixToUse.equals(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX)
|
||||
|| attributePrefixToUse.equals(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)
|
||||
|| attributePrefixToUse.equals(CloudEventMessageUtils.AMQP_ATTR_PREFIX), "Supported prefixes are "
|
||||
+ CloudEventMessageUtils.DEFAULT_ATTR_PREFIX
|
||||
+ ", " + CloudEventMessageUtils.KAFKA_ATTR_PREFIX
|
||||
+ ", " + CloudEventMessageUtils.AMQP_ATTR_PREFIX
|
||||
+ ". Was " + attributePrefixToUse);
|
||||
if (StringUtils.hasText(attributePrefixToUse)) {
|
||||
String[] keys = this.headers.keySet().toArray(new String[] {});
|
||||
for (String key : keys) {
|
||||
if (key.startsWith(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX.length());
|
||||
this.headers.put(attributePrefixToUse + key, value);
|
||||
this.swapPrefix(key, CloudEventMessageUtils.DEFAULT_ATTR_PREFIX, attributePrefixToUse);
|
||||
}
|
||||
else if (key.startsWith(CloudEventMessageUtils.AMQP_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(CloudEventMessageUtils.AMQP_ATTR_PREFIX.length());
|
||||
this.headers.put(attributePrefixToUse + key, value);
|
||||
this.swapPrefix(key, CloudEventMessageUtils.AMQP_ATTR_PREFIX, attributePrefixToUse);
|
||||
}
|
||||
else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(CloudEventMessageUtils.KAFKA_ATTR_PREFIX.length());
|
||||
this.headers.put(attributePrefixToUse + key, value);
|
||||
this.swapPrefix(key, CloudEventMessageUtils.KAFKA_ATTR_PREFIX, attributePrefixToUse);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -173,6 +187,12 @@ public final class CloudEventMessageBuilder<T> {
|
||||
return doBuild(prefix);
|
||||
}
|
||||
|
||||
private void swapPrefix(String key, String currentPrefix, String newPrefix) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(currentPrefix.length());
|
||||
this.headers.put(newPrefix + key, value);
|
||||
}
|
||||
|
||||
private Message<T> doBuild(String prefix) {
|
||||
if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SPECVERSION)) {
|
||||
this.headers.put(prefix + CloudEventMessageUtils._SPECVERSION, "1.0");
|
||||
|
||||
Reference in New Issue
Block a user