Change canonical format of Cloud Event Message attributes to have 'ce-' prefix

This commit is contained in:
Oleg Zhurakousky
2020-12-02 11:24:03 +01:00
parent 0cdcc46f57
commit 523cd1023f
6 changed files with 108 additions and 102 deletions

View File

@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.StringUtils;
/**
* Message builder which is aware of Cloud Event semantics.
@@ -145,16 +146,33 @@ public final class CloudEventMessageBuilder<T> {
return this.doBuild();
}
public Message<T> build(String attributePrefixToUse) {
String[] keys = this.headers.keySet().toArray(new String[] {});
for (String key : keys) {
Object value = this.headers.remove(key);
this.headers.put(attributePrefixToUse + key, value);
if (StringUtils.hasText(attributePrefixToUse)) {
String[] keys = this.headers.keySet().toArray(new String[] {});
for (String key : keys) {
if (key.startsWith(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX.length());
headers.put(attributePrefixToUse + key, value);
}
else if (key.startsWith(CloudEventMessageUtils.AMQP_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(CloudEventMessageUtils.AMQP_ATTR_PREFIX.length());
headers.put(attributePrefixToUse + key, value);
}
else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(CloudEventMessageUtils.KAFKA_ATTR_PREFIX.length());
headers.put(attributePrefixToUse + key, value);
}
}
}
if (!this.headers.containsKey(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION)) {
this.headers.put(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION, "1.0");
}
return build();
return doBuild();
}
private Message<T> doBuild() {

View File

@@ -21,7 +21,6 @@ import java.net.URI;
import java.time.OffsetTime;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -48,8 +47,13 @@ public final class CloudEventMessageUtils {
private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
private CloudEventMessageUtils() {
private static Field MESSAGE_HEADERS = ReflectionUtils.findField(MessageHeaders.class, "headers");
static {
MESSAGE_HEADERS.setAccessible(true);
}
private CloudEventMessageUtils() {
}
/**
@@ -65,7 +69,7 @@ public final class CloudEventMessageUtils {
/**
* Prefix for attributes.
*/
public static String DEFAULT_ATTR_PREFIX = "ce_";
public static String DEFAULT_ATTR_PREFIX = "ce-";
/**
* AMQP attributes prefix.
@@ -75,57 +79,57 @@ public final class CloudEventMessageUtils {
/**
* Prefix for attributes.
*/
public static String HTTP_ATTR_PREFIX = "ce-";
public static String KAFKA_ATTR_PREFIX = "ce_";
/**
* Value for 'data' attribute.
*/
public static String DATA = "data";
public static String DATA = DEFAULT_ATTR_PREFIX + "data";
/**
* Value for 'id' attribute.
*/
public static String ID = "id";
public static String ID = DEFAULT_ATTR_PREFIX + "id";
/**
* Value for 'source' attribute.
*/
public static String SOURCE = "source";
public static String SOURCE = DEFAULT_ATTR_PREFIX + "source";
/**
* Value for 'specversion' attribute.
*/
public static String SPECVERSION = "specversion";
public static String SPECVERSION = DEFAULT_ATTR_PREFIX + "specversion";
/**
* Value for 'type' attribute.
*/
public static String TYPE = "type";
public static String TYPE = DEFAULT_ATTR_PREFIX + "type";
/**
* Value for 'datacontenttype' attribute.
*/
public static String DATACONTENTTYPE = "datacontenttype";
public static String DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + "datacontenttype";
/**
* Value for 'dataschema' attribute.
*/
public static String DATASCHEMA = "dataschema";
public static String DATASCHEMA = DEFAULT_ATTR_PREFIX + "dataschema";
/**
* V03 name for 'dataschema' attribute.
*/
public static final String SCHEMAURL = "schemaurl";
public static final String SCHEMAURL = DEFAULT_ATTR_PREFIX + "schemaurl";
/**
* Value for 'subject' attribute.
*/
public static String SUBJECT = "subject";
public static String SUBJECT = DEFAULT_ATTR_PREFIX + "subject";
/**
* Value for 'time' attribute.
*/
public static String TIME = "time";
public static String TIME = DEFAULT_ATTR_PREFIX + "time";
public static String getId(Message<?> message) {
if (message.getHeaders().containsKey("_id")) {
@@ -171,16 +175,13 @@ public final class CloudEventMessageUtils {
}
@SuppressWarnings("unchecked")
protected static Message<?> toCannonical(Message<?> inputMessage, MessageConverter messageConverter) {
Field headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
headersField.setAccessible(true);
Map<String, Object> headers = (Map<String, Object>) ReflectionUtils.getField(headersField, inputMessage.getHeaders());
canonicalizeHeaders(headers);
protected static Message<?> toCanonical(Message<?> inputMessage, MessageConverter messageConverter) {
Map<String, Object> headers = (Map<String, Object>) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders());
canonicalizeHeaders(headers, false);
String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE);
// first check the obvious and see if content-type is `cloudevents`
if (!isBinary(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
if (!isCloudEvent(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders());
if (contentType.getType().equals(APPLICATION_CLOUDEVENTS.getType()) && contentType
.getSubtype().startsWith(APPLICATION_CLOUDEVENTS.getSubtype())) {
@@ -197,7 +198,7 @@ public final class CloudEventMessageUtils {
Map<String, Object> structuredCloudEvent = (Map<String, Object>) messageConverter
.fromMessage(cloudEventMessage, Map.class);
canonicalizeHeaders(structuredCloudEvent);
canonicalizeHeaders(structuredCloudEvent, true);
Message<?> binaryCeMessage = buildBinaryMessageFromStructuredMap(structuredCloudEvent,
inputMessage.getHeaders());
@@ -221,30 +222,14 @@ public final class CloudEventMessageUtils {
* @return prefix (e.g., 'ce_' or 'ce-' etc.)
*/
protected static String determinePrefixToUse(Map<String, Object> messageHeaders) {
Set<String> keys = messageHeaders.keySet();
if (keys.contains("user-agent")) {
return HTTP_ATTR_PREFIX;
}
else {
for (String key : messageHeaders.keySet()) {
if (key.startsWith("kafka_")) {
return DEFAULT_ATTR_PREFIX;
}
else if (key.startsWith("amqp_")) {
return AMQP_ATTR_PREFIX;
}
else if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
return DEFAULT_ATTR_PREFIX;
}
else if (key.startsWith(HTTP_ATTR_PREFIX)) {
return HTTP_ATTR_PREFIX;
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
return AMQP_ATTR_PREFIX;
}
for (String key : messageHeaders.keySet()) {
if (key.startsWith(KAFKA_ATTR_PREFIX)) {
return KAFKA_ATTR_PREFIX;
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
return AMQP_ATTR_PREFIX;
}
}
return "";
}
@@ -254,7 +239,7 @@ public final class CloudEventMessageUtils {
* @param message input {@link Message}
* @return true if this Message represents Cloud Event in binary-mode
*/
protected static boolean isBinary(Message<?> message) {
protected static boolean isCloudEvent(Message<?> message) {
return message.getHeaders().containsKey(SPECVERSION)
&& message.getHeaders().containsKey(TYPE)
&& message.getHeaders().containsKey(SOURCE);
@@ -265,23 +250,27 @@ public final class CloudEventMessageUtils {
* So, for example 'ce_source' will become 'source'.
* @param headers message headers
*/
private static void canonicalizeHeaders(Map<String, Object> headers) {
private static void canonicalizeHeaders(Map<String, Object> headers, boolean structured) {
String[] keys = headers.keySet().toArray(new String[] {});
for (String key : keys) {
if (key.startsWith(HTTP_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(HTTP_ATTR_PREFIX.length());
headers.put(key, value);
}
else if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(DEFAULT_ATTR_PREFIX.length());
headers.put(key, value);
headers.put(DEFAULT_ATTR_PREFIX + key, value);
}
else if (key.startsWith(KAFKA_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(KAFKA_ATTR_PREFIX.length());
headers.put(DEFAULT_ATTR_PREFIX + key, value);
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(AMQP_ATTR_PREFIX.length());
headers.put(key, value);
headers.put(DEFAULT_ATTR_PREFIX + key, value);
}
else if (structured) {
Object value = headers.remove(key);
headers.put(DEFAULT_ATTR_PREFIX + key, value);
}
}
}

View File

@@ -57,13 +57,13 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
@Override
public Message<?> preProcessInput(Message<?> input, Object inputConverter) {
return CloudEventMessageUtils.toCannonical(input, (MessageConverter) inputConverter);
return CloudEventMessageUtils.toCanonical(input, (MessageConverter) inputConverter);
}
@Override
public Message<?> postProcessResult(Message<?> input, Object result) {
Message<?> resultMessage = null;
if (CloudEventMessageUtils.isBinary(input)) {
if (CloudEventMessageUtils.isCloudEvent(input)) {
CloudEventMessageBuilder<?> messageBuilder = CloudEventMessageBuilder
.withData(result)
.setId(UUID.randomUUID().toString())
@@ -75,6 +75,7 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
}
String prefix = this.determineOutputPrefix(input);
resultMessage = messageBuilder.build(prefix);
}
else {

View File

@@ -31,6 +31,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
@@ -56,7 +57,7 @@ public class CloudEventFunctionTests {
.build();
assertThat(inputMessage.getHeaders().getId()).isEqualTo(UUID.fromString(id));
assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue();
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue();
Message<Person> resultMessage = (Message<Person>) function.apply(inputMessage);
@@ -66,7 +67,7 @@ public class CloudEventFunctionTests {
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
@@ -95,7 +96,7 @@ public class CloudEventFunctionTests {
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(Person.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
@@ -117,12 +118,12 @@ public class CloudEventFunctionTests {
"}";
Function<Object, Object> function = this.lookup("springRelease", TestConfiguration.class);
Message<String> inputMessage = CloudEventMessageBuilder
.withData(payload)
Message<String> inputMessage = MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.build();
assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse();
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse();
Message<SpringReleaseEvent> resultMessage = (Message<SpringReleaseEvent>) function.apply(inputMessage);
assertThat(resultMessage.getPayload().getReleaseDate())
@@ -133,7 +134,7 @@ public class CloudEventFunctionTests {
// * both on input and output that it is dealing with Cloud Event and generates
// * appropriate headers/attributes
// */
assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
@@ -158,7 +159,7 @@ public class CloudEventFunctionTests {
.withData(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.build();
assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isFalse();
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse();
Message<SpringReleaseEvent> resultMessage = (Message<SpringReleaseEvent>) function.apply(inputMessage);
assertThat(resultMessage.getPayload().getReleaseDate())
@@ -169,7 +170,7 @@ public class CloudEventFunctionTests {
* both on input and output that it is dealing with Cloud Event and generates
* appropriate headers/attributes
*/
assertThat(CloudEventMessageUtils.isBinary(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}