diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index 75e14ce93..d3d4bc0a9 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -180,11 +180,7 @@ public final class CloudEventMessageBuilder { } } } - - String prefix = StringUtils.hasText(attributePrefixToUse) - ? attributePrefixToUse - : CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; - return doBuild(prefix); + return doBuild(attributePrefixToUse); } private void swapPrefix(String key, String currentPrefix, String newPrefix) { @@ -201,7 +197,7 @@ public final class CloudEventMessageBuilder { this.headers.put(prefix + CloudEventMessageUtils._ID, UUID.randomUUID().toString()); } this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); - CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, null, null); + MessageHeaders headers = new MessageHeaders(this.headers); GenericMessage message = new GenericMessage(this.data, headers); Assert.hasText(CloudEventMessageUtils.getSpecVersion(message), "'specversion' must not be null or empty"); Assert.notNull(CloudEventMessageUtils.getSource(message), "'source' must not be null"); @@ -209,13 +205,4 @@ public final class CloudEventMessageBuilder { Assert.hasText(CloudEventMessageUtils.getId(message), "'id' must not be null or empty"); return message; } - - private static class CloudEventMessageHeaders extends MessageHeaders { - - private static final long serialVersionUID = -6424866731588545945L; - - protected CloudEventMessageHeaders(Map headers, UUID id, Long timestamp) { - super(headers, id, timestamp); - } - } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 805f7fefb..ca6b2cc76 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -162,9 +162,6 @@ public final class CloudEventMessageUtils { public static String getId(Message message) { -// if (message.getHeaders().containsKey("_id")) { -// return (String) message.getHeaders().get("_id"); -// } String prefix = determinePrefixToUse(message.getHeaders()); return (String) message.getHeaders().get(prefix + MessageHeaders.ID); } @@ -215,6 +212,13 @@ public final class CloudEventMessageUtils { .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); } + /** + * This method does several things. + * First in canonicalizes Cloud Events attributes ensuring that they all prefixed + * with 'ce-' prefix regardless where they came from. + * It also transforms structured-mode Cloud Event to binary-mode and then it canonicalizes attributes + * as well as described in the previous sentence. + */ @SuppressWarnings("unchecked") static Message toCanonical(Message inputMessage, MessageConverter messageConverter) { Map headers = (Map) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders()); @@ -223,6 +227,7 @@ public final class CloudEventMessageUtils { String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE); // first check the obvious and see if content-type is `cloudevents` if (!isCloudEvent(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { + // structured-mode MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders()); if (contentType.getType().equals(APPLICATION_CLOUDEVENTS.getType()) && contentType .getSubtype().startsWith(APPLICATION_CLOUDEVENTS.getSubtype())) { @@ -240,13 +245,12 @@ public final class CloudEventMessageUtils { .fromMessage(cloudEventMessage, Map.class); canonicalizeHeaders(structuredCloudEvent, true); - Message binaryCeMessage = buildBinaryMessageFromStructuredMap(structuredCloudEvent, + return buildBinaryMessageFromStructuredMap(structuredCloudEvent, inputMessage.getHeaders()); - - return binaryCeMessage; } } - else if (StringUtils.hasText(inputContentType)) { // this needs thinking since . . + else if (StringUtils.hasText(inputContentType)) { + // binary-mode, but DATACONTENTTYPE was specified explicitly so we set it as CT to ensure proper message converters are used. return MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, inputContentType) .build(); } @@ -256,24 +260,40 @@ public final class CloudEventMessageUtils { /** * Determines attribute prefix based on the presence of certain well defined headers. - * - * TODO work in progress as it needs to be refined - * * @param messageHeaders map of message headers * @return prefix (e.g., 'ce_' or 'ce-' etc.) */ static String determinePrefixToUse(Map messageHeaders) { - for (String key : messageHeaders.keySet()) { - if (key.startsWith(DEFAULT_ATTR_PREFIX)) { - return DEFAULT_ATTR_PREFIX; + String targetProtocol = (String) messageHeaders.get(MessageUtils.TARGET_PROTOCOL); + if (StringUtils.hasText(targetProtocol)) { + if ("kafka".equals(targetProtocol)) { + return CloudEventMessageUtils.KAFKA_ATTR_PREFIX; } - else if (key.startsWith(KAFKA_ATTR_PREFIX)) { - return KAFKA_ATTR_PREFIX; + else if ("amqp".equals(targetProtocol)) { + return CloudEventMessageUtils.AMQP_ATTR_PREFIX; } - else if (key.startsWith(AMQP_ATTR_PREFIX)) { - return AMQP_ATTR_PREFIX; + else if ("http".equals(targetProtocol)) { + return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; + } + else { + throw new IllegalArgumentException("Provided TARGET_PROTOCOL is not suported: " + targetProtocol + ". " + + "Supported protoclos are, 'kafka', 'amqp' and 'http'"); } } + else { + for (String key : messageHeaders.keySet()) { + if (key.startsWith(DEFAULT_ATTR_PREFIX)) { + return DEFAULT_ATTR_PREFIX; + } + else if (key.startsWith(KAFKA_ATTR_PREFIX)) { + return KAFKA_ATTR_PREFIX; + } + else if (key.startsWith(AMQP_ATTR_PREFIX)) { + return AMQP_ATTR_PREFIX; + } + } + } + return ""; } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java index c506e11bb..6b56840b3 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.UUID; import org.springframework.beans.BeansException; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.context.ApplicationContext; @@ -34,7 +35,7 @@ import org.springframework.util.StringUtils; /** * Implementation of {@link FunctionInvocationHelper} to support Cloud Events. - * This is a primary (and the only) integration bridge with {@link FunctionInvocationHelper}. + * This is a primary (and the only) integration bridge with {@link FunctionInvocationWrapper}. * * @author Oleg Zhurakousky * @since 3.1 @@ -52,11 +53,8 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper message) { - if (message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE) - && message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE)) { - return true; - } - return false; + return message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE) + && message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE); } @Override @@ -84,9 +82,7 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper)) { resultMessage = MessageBuilder.withPayload(result).build(); @@ -100,13 +96,6 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper input) { - /* - * TODO rework to actually figure out where output goes instead of relying on input - * In streams we can overrode and access output binding, ect. - */ - return CloudEventMessageUtils.determinePrefixToUse(input.getHeaders()); - } private String getApplicationName() { ConfigurableEnvironment environment = this.applicationContext.getEnvironment();