Minor improvements and clean up Cloud Event package

This commit is contained in:
Oleg Zhurakousky
2020-12-03 20:31:17 +01:00
parent 0f0c3a87e5
commit bdcb232e42
3 changed files with 44 additions and 48 deletions

View File

@@ -180,11 +180,7 @@ public final class CloudEventMessageBuilder<T> {
}
}
}
String prefix = StringUtils.hasText(attributePrefixToUse)
? attributePrefixToUse
: CloudEventMessageUtils.DEFAULT_ATTR_PREFIX;
return doBuild(prefix);
return doBuild(attributePrefixToUse);
}
private void swapPrefix(String key, String currentPrefix, String newPrefix) {
@@ -201,7 +197,7 @@ public final class CloudEventMessageBuilder<T> {
this.headers.put(prefix + CloudEventMessageUtils._ID, UUID.randomUUID().toString());
}
this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, null, null);
MessageHeaders headers = new MessageHeaders(this.headers);
GenericMessage<T> message = new GenericMessage<T>(this.data, headers);
Assert.hasText(CloudEventMessageUtils.getSpecVersion(message), "'specversion' must not be null or empty");
Assert.notNull(CloudEventMessageUtils.getSource(message), "'source' must not be null");
@@ -209,13 +205,4 @@ public final class CloudEventMessageBuilder<T> {
Assert.hasText(CloudEventMessageUtils.getId(message), "'id' must not be null or empty");
return message;
}
private static class CloudEventMessageHeaders extends MessageHeaders {
private static final long serialVersionUID = -6424866731588545945L;
protected CloudEventMessageHeaders(Map<String, Object> headers, UUID id, Long timestamp) {
super(headers, id, timestamp);
}
}
}

View File

@@ -162,9 +162,6 @@ public final class CloudEventMessageUtils {
public static String getId(Message<?> message) {
// if (message.getHeaders().containsKey("_id")) {
// return (String) message.getHeaders().get("_id");
// }
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + MessageHeaders.ID);
}
@@ -215,6 +212,13 @@ public final class CloudEventMessageUtils {
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
/**
* This method does several things.
* First in canonicalizes Cloud Events attributes ensuring that they all prefixed
* with 'ce-' prefix regardless where they came from.
* It also transforms structured-mode Cloud Event to binary-mode and then it canonicalizes attributes
* as well as described in the previous sentence.
*/
@SuppressWarnings("unchecked")
static Message<?> toCanonical(Message<?> inputMessage, MessageConverter messageConverter) {
Map<String, Object> headers = (Map<String, Object>) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders());
@@ -223,6 +227,7 @@ public final class CloudEventMessageUtils {
String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE);
// first check the obvious and see if content-type is `cloudevents`
if (!isCloudEvent(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
// structured-mode
MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders());
if (contentType.getType().equals(APPLICATION_CLOUDEVENTS.getType()) && contentType
.getSubtype().startsWith(APPLICATION_CLOUDEVENTS.getSubtype())) {
@@ -240,13 +245,12 @@ public final class CloudEventMessageUtils {
.fromMessage(cloudEventMessage, Map.class);
canonicalizeHeaders(structuredCloudEvent, true);
Message<?> binaryCeMessage = buildBinaryMessageFromStructuredMap(structuredCloudEvent,
return buildBinaryMessageFromStructuredMap(structuredCloudEvent,
inputMessage.getHeaders());
return binaryCeMessage;
}
}
else if (StringUtils.hasText(inputContentType)) { // this needs thinking since . .
else if (StringUtils.hasText(inputContentType)) {
// binary-mode, but DATACONTENTTYPE was specified explicitly so we set it as CT to ensure proper message converters are used.
return MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, inputContentType)
.build();
}
@@ -256,24 +260,40 @@ public final class CloudEventMessageUtils {
/**
* Determines attribute prefix based on the presence of certain well defined headers.
*
* TODO work in progress as it needs to be refined
*
* @param messageHeaders map of message headers
* @return prefix (e.g., 'ce_' or 'ce-' etc.)
*/
static String determinePrefixToUse(Map<String, Object> messageHeaders) {
for (String key : messageHeaders.keySet()) {
if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
return DEFAULT_ATTR_PREFIX;
String targetProtocol = (String) messageHeaders.get(MessageUtils.TARGET_PROTOCOL);
if (StringUtils.hasText(targetProtocol)) {
if ("kafka".equals(targetProtocol)) {
return CloudEventMessageUtils.KAFKA_ATTR_PREFIX;
}
else if (key.startsWith(KAFKA_ATTR_PREFIX)) {
return KAFKA_ATTR_PREFIX;
else if ("amqp".equals(targetProtocol)) {
return CloudEventMessageUtils.AMQP_ATTR_PREFIX;
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
return AMQP_ATTR_PREFIX;
else if ("http".equals(targetProtocol)) {
return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX;
}
else {
throw new IllegalArgumentException("Provided TARGET_PROTOCOL is not suported: " + targetProtocol + ". "
+ "Supported protoclos are, 'kafka', 'amqp' and 'http'");
}
}
else {
for (String key : messageHeaders.keySet()) {
if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
return DEFAULT_ATTR_PREFIX;
}
else if (key.startsWith(KAFKA_ATTR_PREFIX)) {
return KAFKA_ATTR_PREFIX;
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
return AMQP_ATTR_PREFIX;
}
}
}
return "";
}

View File

@@ -20,6 +20,7 @@ import java.net.URI;
import java.util.UUID;
import org.springframework.beans.BeansException;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.context.ApplicationContext;
@@ -34,7 +35,7 @@ import org.springframework.util.StringUtils;
/**
* Implementation of {@link FunctionInvocationHelper} to support Cloud Events.
* This is a primary (and the only) integration bridge with {@link FunctionInvocationHelper}.
* This is a primary (and the only) integration bridge with {@link FunctionInvocationWrapper}.
*
* @author Oleg Zhurakousky
* @since 3.1
@@ -52,11 +53,8 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
@Override
public boolean isRetainOuputAsMessage(Message<?> message) {
if (message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE)
&& message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE)) {
return true;
}
return false;
return message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE)
&& message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE);
}
@Override
@@ -84,9 +82,7 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder);
}
String prefix = this.determineOutputPrefix(input);
resultMessage = messageBuilder.build(prefix);
resultMessage = messageBuilder.build(CloudEventMessageUtils.determinePrefixToUse(input.getHeaders()));
}
else if (!(result instanceof Message<?>)) {
resultMessage = MessageBuilder.withPayload(result).build();
@@ -100,13 +96,6 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
private String determineOutputPrefix(Message<?> input) {
/*
* TODO rework to actually figure out where output goes instead of relying on input
* In streams we can overrode and access output binding, ect.
*/
return CloudEventMessageUtils.determinePrefixToUse(input.getHeaders());
}
private String getApplicationName() {
ConfigurableEnvironment environment = this.applicationContext.getEnvironment();