This commit is contained in:
Oleg Zhurakousky
2020-12-02 12:45:45 +01:00
parent 98c9d56442
commit 4c69ca1cd6
3 changed files with 70 additions and 25 deletions

View File

@@ -58,9 +58,9 @@ public final class CloudEventMessageBuilder<T> {
}
@SuppressWarnings("unchecked")
public static <T> CloudEventMessageBuilder<T> fromMessage(Message<?> message) {
public static <T> CloudEventMessageBuilder<T> fromMessage(Message<T> message) {
CloudEventMessageBuilder<T> builder = new CloudEventMessageBuilder<T>(new HashMap<>(message.getHeaders()));
builder.data = (T) message.getPayload();
builder.data = message.getPayload();
return builder;
}
@@ -169,8 +169,9 @@ public final class CloudEventMessageBuilder<T> {
}
}
if (!this.headers.containsKey(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION)) {
this.headers.put(attributePrefixToUse + CloudEventMessageUtils.SPECVERSION, "1.0");
if (!this.headers.containsKey(attributePrefixToUse + "specversion")) {
String prefix = StringUtils.hasText(attributePrefixToUse) ? attributePrefixToUse : CloudEventMessageUtils.DEFAULT_ATTR_PREFIX;
this.headers.put(prefix + CloudEventMessageUtils._SPECVERSION, "1.0");
}
return doBuild();
}

View File

@@ -21,6 +21,7 @@ import java.net.URI;
import java.time.OffsetTime;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -56,6 +57,28 @@ public final class CloudEventMessageUtils {
private CloudEventMessageUtils() {
}
//=====
static String _DATA = "data";
static String _ID = "id";
static String _SOURCE = "source";
static String _SPECVERSION = "specversion";
static String _TYPE = "type";
static String _DATACONTENTTYPE = "datacontenttype";
static String _DATASCHEMA = "dataschema";
static String _SCHEMAURL = "schemaurl";
static String _SUBJECT = "subject";
static String _TIME = "time";
/**
* String value of 'application/cloudevents' mime type.
*/
@@ -84,52 +107,53 @@ public final class CloudEventMessageUtils {
/**
* Value for 'data' attribute.
*/
public static String DATA = DEFAULT_ATTR_PREFIX + "data";
public static String DATA = DEFAULT_ATTR_PREFIX + _DATA;
/**
* Value for 'id' attribute.
*/
public static String ID = DEFAULT_ATTR_PREFIX + "id";
public static String ID = DEFAULT_ATTR_PREFIX + _ID;
/**
* Value for 'source' attribute.
*/
public static String SOURCE = DEFAULT_ATTR_PREFIX + "source";
public static String SOURCE = DEFAULT_ATTR_PREFIX + _SOURCE;
/**
* Value for 'specversion' attribute.
*/
public static String SPECVERSION = DEFAULT_ATTR_PREFIX + "specversion";
public static String SPECVERSION = DEFAULT_ATTR_PREFIX + _SPECVERSION;
/**
* Value for 'type' attribute.
*/
public static String TYPE = DEFAULT_ATTR_PREFIX + "type";
public static String TYPE = DEFAULT_ATTR_PREFIX + _TYPE;
/**
* Value for 'datacontenttype' attribute.
*/
public static String DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + "datacontenttype";
public static String DATACONTENTTYPE = DEFAULT_ATTR_PREFIX + _DATACONTENTTYPE;
/**
* Value for 'dataschema' attribute.
*/
public static String DATASCHEMA = DEFAULT_ATTR_PREFIX + "dataschema";
public static String DATASCHEMA = DEFAULT_ATTR_PREFIX + _DATASCHEMA;
/**
* V03 name for 'dataschema' attribute.
*/
public static final String SCHEMAURL = DEFAULT_ATTR_PREFIX + "schemaurl";
public static final String SCHEMAURL = DEFAULT_ATTR_PREFIX + _SCHEMAURL;
/**
* Value for 'subject' attribute.
*/
public static String SUBJECT = DEFAULT_ATTR_PREFIX + "subject";
public static String SUBJECT = DEFAULT_ATTR_PREFIX + _SUBJECT;
/**
* Value for 'time' attribute.
*/
public static String TIME = DEFAULT_ATTR_PREFIX + "time";
public static String TIME = DEFAULT_ATTR_PREFIX + _TIME;
public static String getId(Message<?> message) {
if (message.getHeaders().containsKey("_id")) {
@@ -141,41 +165,52 @@ public final class CloudEventMessageUtils {
public static URI getSource(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return safeGetURI(message.getHeaders(), prefix + SOURCE);
return safeGetURI(message.getHeaders(), prefix + _SOURCE);
}
public static String getSpecVersion(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + SPECVERSION);
return (String) message.getHeaders().get(prefix + _SPECVERSION);
}
public static String getType(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + TYPE);
return (String) message.getHeaders().get(prefix + _TYPE);
}
public static String getDataContentType(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + DATACONTENTTYPE);
return (String) message.getHeaders().get(prefix + _DATACONTENTTYPE);
}
public static URI getDataSchema(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return safeGetURI(message.getHeaders(), prefix + DATASCHEMA);
return safeGetURI(message.getHeaders(), prefix + _DATASCHEMA);
}
public static String getSubject(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + SUBJECT);
return (String) message.getHeaders().get(prefix + _SUBJECT);
}
public static OffsetTime getTime(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (OffsetTime) message.getHeaders().get(prefix + TIME);
return (OffsetTime) message.getHeaders().get(prefix + _TIME);
}
@SuppressWarnings("unchecked")
protected static Message<?> toCanonical(Message<?> inputMessage, MessageConverter messageConverter) {
public static <T> T getData(Message<?> message) {
return (T) message.getPayload();
}
public static Map<String, Object> getAttributes(Message<?> message) {
return message.getHeaders().entrySet().stream()
.filter(e -> isAttribute(e.getKey()))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
@SuppressWarnings("unchecked")
static Message<?> toCanonical(Message<?> inputMessage, MessageConverter messageConverter) {
Map<String, Object> headers = (Map<String, Object>) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders());
canonicalizeHeaders(headers, false);
@@ -221,9 +256,12 @@ public final class CloudEventMessageUtils {
* @param messageHeaders map of message headers
* @return prefix (e.g., 'ce_' or 'ce-' etc.)
*/
protected static String determinePrefixToUse(Map<String, Object> messageHeaders) {
static String determinePrefixToUse(Map<String, Object> messageHeaders) {
for (String key : messageHeaders.keySet()) {
if (key.startsWith(KAFKA_ATTR_PREFIX)) {
if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
return DEFAULT_ATTR_PREFIX;
}
else if (key.startsWith(KAFKA_ATTR_PREFIX)) {
return KAFKA_ATTR_PREFIX;
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
@@ -239,12 +277,16 @@ public final class CloudEventMessageUtils {
* @param message input {@link Message}
* @return true if this Message represents Cloud Event in binary-mode
*/
protected static boolean isCloudEvent(Message<?> message) {
static boolean isCloudEvent(Message<?> message) {
return message.getHeaders().containsKey(SPECVERSION)
&& message.getHeaders().containsKey(TYPE)
&& message.getHeaders().containsKey(SOURCE);
}
private static boolean isAttribute(String key) {
return key.startsWith(DEFAULT_ATTR_PREFIX) || key.startsWith(AMQP_ATTR_PREFIX) || key.startsWith(KAFKA_ATTR_PREFIX);
}
/**
* Will canonicalize Cloud Event attributes (headers) by removing well known prefixes.
* So, for example 'ce_source' will become 'source'.

View File

@@ -32,6 +32,8 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
/**
* Implementation of {@link FunctionInvocationHelper} to support Cloud Events.
* This is a primary (and the only) integration bridge with {@link FunctionInvocationHelper}.
*
* @author Oleg Zhurakousky
* @since 2.0