From ada16079ca13a2ba819b078624c28392f91997fa Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 16 Nov 2020 12:06:33 +0100 Subject: [PATCH] GH-422 GH-606 Add support for normalizing structure-mode CE message Normalizing in this context means converting it to binary-mode so the rest of the processing logic is the same. Added support for canonical attribute names. Now, internally any attribute can be set as 'ce_' regardless where it came from are where it goes to as the frameork will be able to recognize both Removed CloudEventMessageConverter Renamed CloudEventAttributes to CloudEventAttributesHelperas it is better suited to what it actually does --- .../cloudevent/CloudEventAttributes.java | 84 ---------- .../CloudEventAttributesHelper.java | 125 +++++++++++++++ .../CloudEventAttributesProvider.java | 2 +- ...entDataContentTypeMessagePreProcessor.java | 130 ---------------- .../CloudEventJsonMessageConverter.java | 39 ----- .../cloudevent/CloudEventMessageUtils.java | 143 +++++++++++++----- .../DefaultCloudEventAttributesProvider.java | 5 +- .../cloudevent/RequiredAttributeAccessor.java | 41 ++++- .../catalog/SimpleFunctionRegistry.java | 4 + ...ntextFunctionCatalogAutoConfiguration.java | 5 - .../context/config/JsonMessageConverter.java | 4 +- .../SmartCompositeMessageConverter.java | 14 -- .../CloudEventTypeConversionTests.java | 34 +---- .../cloudevent/CloudeventDemoApplication.java | 4 +- .../CloudeventDemoApplicationRESTTests.java | 53 +++++-- 15 files changed, 325 insertions(+), 362 deletions(-) delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java deleted file mode 100644 index 0b534db81..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import java.util.HashMap; -import java.util.Map; - - -/** - * - * @author Oleg Zhurakousky - * @since 3.1 - */ -public class CloudEventAttributes extends HashMap { - - /** - * - */ - private static final long serialVersionUID = 5393610770855366497L; - - - - CloudEventAttributes(Map headers) { - super(headers); - } - - @SuppressWarnings("unchecked") - public A getId() { - return this.containsKey(CloudEventMessageUtils.CE_ID) - ? (A) this.get(CloudEventMessageUtils.CE_ID) - : (A) this.get(CloudEventMessageUtils.ID); - } - - @SuppressWarnings("unchecked") - public A getSource() { - return this.containsKey(CloudEventMessageUtils.CE_SOURCE) - ? (A) this.get(CloudEventMessageUtils.CE_SOURCE) - : (A) this.get(CloudEventMessageUtils.SOURCE); - } - - @SuppressWarnings("unchecked") - public A getSpecversion() { - return this.containsKey(CloudEventMessageUtils.CE_SPECVERSION) - ? (A) this.get(CloudEventMessageUtils.CE_SPECVERSION) - : (A) this.get(CloudEventMessageUtils.SPECVERSION); - } - - @SuppressWarnings("unchecked") - public A getType() { - return this.containsKey(CloudEventMessageUtils.CE_TYPE) - ? (A) this.get(CloudEventMessageUtils.CE_TYPE) - : (A) this.get(CloudEventMessageUtils.TYPE); - } - - @SuppressWarnings("unchecked") - public A getDataContentType() { - return this.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE) - ? (A) this.get(CloudEventMessageUtils.CE_DATACONTENTTYPE) - : (A) this.get(CloudEventMessageUtils.DATACONTENTTYPE); - } - - public void setDataContentType(String datacontenttype) { - this.put(CloudEventMessageUtils.CE_DATACONTENTTYPE, datacontenttype); - } - - @SuppressWarnings("unchecked") - public A getAtttribute(String name) { - return (A) this.get(name); - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java new file mode 100644 index 000000000..0050c2df8 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java @@ -0,0 +1,125 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.cloudevent; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.util.StringUtils; + + +/** + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +public class CloudEventAttributesHelper extends HashMap { + + /** + * + */ + private static final long serialVersionUID = 5393610770855366497L; + + + + CloudEventAttributesHelper(Map headers) { + super(headers); + } + + @SuppressWarnings("unchecked") + public A getId() { + if (this.containsKey(CloudEventMessageUtils.CANONICAL_ID)) { + return (A) this.get(CloudEventMessageUtils.CANONICAL_ID); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)) { + return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID); + } + return null; + } + + String getAttributeName(String attributeName) { + if (this.containsKey(CloudEventMessageUtils.ATTR_PREFIX + attributeName)) { + return CloudEventMessageUtils.ATTR_PREFIX + attributeName; + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName)) { + return CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName; + } + return attributeName; + } + + @SuppressWarnings("unchecked") + public A getSource() { + if (this.containsKey(CloudEventMessageUtils.CANONICAL_SOURCE)) { + return (A) this.get(CloudEventMessageUtils.CANONICAL_SOURCE); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) { + return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE); + } + return (A) this.get(CloudEventMessageUtils.SOURCE); + } + + @SuppressWarnings("unchecked") + public A getSpecversion() { + if (this.containsKey(CloudEventMessageUtils.CANONICAL_SPECVERSION)) { + return (A) this.get(CloudEventMessageUtils.CANONICAL_SPECVERSION); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION)) { + return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION); + } + return (A) this.get(CloudEventMessageUtils.SPECVERSION); + } + + @SuppressWarnings("unchecked") + public A getType() { + if (this.containsKey(CloudEventMessageUtils.CANONICAL_TYPE)) { + return (A) this.get(CloudEventMessageUtils.CANONICAL_TYPE); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) { + return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE); + } + return (A) this.get(CloudEventMessageUtils.TYPE); + } + + @SuppressWarnings("unchecked") + public A getDataContentType() { + Object dataContentType; + if (this.containsKey(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE)) { + dataContentType = this.get(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATACONTENTTYPE)) { + dataContentType = this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATACONTENTTYPE); + } + dataContentType = this.get(CloudEventMessageUtils.DATACONTENTTYPE); + return (A) dataContentType; + } + + public void setDataContentType(String datacontenttype) { + this.put(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, datacontenttype); + } + + @SuppressWarnings("unchecked") + public A getAtttribute(String name) { + return (A) this.get(name); + } + + public boolean isValidCloudEvent() { + return StringUtils.hasText(this.getId()) + && StringUtils.hasText(this.getSource()) + && StringUtils.hasText(this.getSpecversion()) + && StringUtils.hasText(this.getType()); + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java index 5e924d397..baa309604 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java @@ -31,7 +31,7 @@ public interface CloudEventAttributesProvider { * * @param inputMessage input message used to invoke user functionality (e.g., function) * @param result result of the invocation of user functionality (e.g., function) - * @return instance of {@link CloudEventAttributes} + * @return instance of {@link CloudEventAttributesHelper} */ Map generateDefaultCloudEventHeaders(Message inputMessage, Object result); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java deleted file mode 100644 index 531da48c9..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import java.util.Map; -import java.util.function.Function; - -import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.converter.CompositeMessageConverter; -import org.springframework.messaging.converter.ContentTypeResolver; -import org.springframework.messaging.converter.DefaultContentTypeResolver; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.Assert; -import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; - -/** - * A Cloud Events specific pre-processor that is added to {@link SmartCompositeMessageConverter} - * to potentially modify incoming message. - *

- * For Cloud Event coming in binary-mode such modification implies determining - * content type of the 'data' attribute (see {@link #getDataContentType(MessageHeaders)} - * of Cloud Event and creating a new {@link Message} with its `contentType` set to such - * content type while copying the rest of the headers. - *

- * Similar to Cloud Event coming in binary-mode, the Cloud Event coming in structured-mode - * such modification also implies determining content type of the 'data' attribute - * (see {@link #getDataContentType(MessageHeaders)}... - * - * @author Oleg Zhurakousky - * @since 3.1 - */ -public class CloudEventDataContentTypeMessagePreProcessor implements Function, Message> { - - private final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver(); - - private final MimeType cloudEventContentType = CloudEventMessageUtils.APPLICATION_CLOUDEVENTS; - - private final CompositeMessageConverter messageConverter; - - public CloudEventDataContentTypeMessagePreProcessor(CompositeMessageConverter messageConverter) { - Assert.notNull(messageConverter, "'messageConverter' must not be null"); - this.messageConverter = messageConverter; - } - - @SuppressWarnings("unchecked") - @Override - public Message apply(Message inputMessage) { - if (CloudEventMessageUtils.isBinary(inputMessage.getHeaders())) { - String dataContentType = this.getDataContentType(inputMessage.getHeaders()); - Message message = MessageBuilder.fromMessage(inputMessage) - .setHeader(MessageHeaders.CONTENT_TYPE, dataContentType) -// .setHeader("originalContentType", inputMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)) not sure about it - .build(); - return message; - } - else if (this.isStructured(inputMessage)) { - MimeType contentType = this.contentTypeResolver.resolve(inputMessage.getHeaders()); - String dataContentType = this.getDataContentType(inputMessage.getHeaders()); - String suffix = contentType.getSubtypeSuffix(); - MimeType cloudEventDeserializationContentType = MimeTypeUtils - .parseMimeType(contentType.getType() + "/" + suffix); - Message cloudEventMessage = MessageBuilder.fromMessage(inputMessage) - .setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType) - .setHeader(CloudEventMessageUtils.CE_DATACONTENTTYPE, dataContentType).build(); - Map structuredCloudEvent = (Map) this.messageConverter - .fromMessage(cloudEventMessage, Map.class); - Message binaryCeMessage = this.buildCeMessageFromStructured(structuredCloudEvent); - return binaryCeMessage; - } - else { - return inputMessage; - } - } - - private Message buildCeMessageFromStructured(Map structuredCloudEvent) { - MessageBuilder builder = MessageBuilder.withPayload( - structuredCloudEvent.containsKey(CloudEventMessageUtils.CE_DATA) - ? structuredCloudEvent.get(CloudEventMessageUtils.CE_DATA) - : structuredCloudEvent.get(CloudEventMessageUtils.DATA)); - structuredCloudEvent.remove(CloudEventMessageUtils.CE_DATA); - structuredCloudEvent.remove(CloudEventMessageUtils.DATA); - builder.copyHeaders(structuredCloudEvent); - return builder.build(); - } - - private String getDataContentType(MessageHeaders headers) { - if (headers.containsKey(CloudEventMessageUtils.DATACONTENTTYPE)) { - return (String) headers.get(CloudEventMessageUtils.DATACONTENTTYPE); - } - else if (headers.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE)) { - return (String) headers.get(CloudEventMessageUtils.CE_DATACONTENTTYPE); - } - else if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) { - return headers.get(MessageHeaders.CONTENT_TYPE).toString(); - } - return MimeTypeUtils.APPLICATION_JSON_VALUE; - } - - private boolean isStructured(Message message) { - if (!CloudEventMessageUtils.isBinary(message.getHeaders())) { - Map headers = message.getHeaders(); - - if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) { - MimeType contentType = this.contentTypeResolver.resolve(message.getHeaders()); - if (contentType.getType().equals(this.cloudEventContentType.getType()) - && contentType.getSubtype().startsWith(this.cloudEventContentType.getSubtype())) { - return true; - } - } - } - return false; - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java deleted file mode 100644 index 1ab79e861..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import org.springframework.cloud.function.context.config.JsonMessageConverter; -import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.messaging.converter.MessageConverter; -import org.springframework.util.MimeType; - -/** - * Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the - * actual conversion via {@link JsonMapper} instance. - * - * @author Oleg Zhurakousky - * - * @since 3.1 - */ -public class CloudEventJsonMessageConverter extends JsonMessageConverter { - - public CloudEventJsonMessageConverter(JsonMapper jsonMapper) { - super(jsonMapper, new MimeType(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType(), - CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype() + "+json")); - this.setStrictContentTypeMatch(true); - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 9ecd1c9fa..daf4d76b5 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -18,13 +18,19 @@ package org.springframework.cloud.function.cloudevent; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.ContentTypeResolver; +import org.springframework.messaging.converter.DefaultContentTypeResolver; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StringUtils; /** * Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/. @@ -36,6 +42,8 @@ import org.springframework.util.MimeTypeUtils; */ public final class CloudEventMessageUtils { + private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver(); + private CloudEventMessageUtils() { } @@ -53,7 +61,12 @@ public final class CloudEventMessageUtils { /** * Prefix for attributes. */ - public static String ATTR_PREFIX = "ce-"; + public static String ATTR_PREFIX = "ce_"; + + /** + * Prefix for attributes. + */ + public static String HTTP_ATTR_PREFIX = "ce-"; /** * Value for 'data' attribute. @@ -63,7 +76,7 @@ public final class CloudEventMessageUtils { /** * Value for 'data' attribute with prefix. */ - public static String CE_DATA = ATTR_PREFIX + DATA; + public static String CANONICAL_DATA = ATTR_PREFIX + DATA; /** * Value for 'id' attribute. @@ -73,7 +86,7 @@ public final class CloudEventMessageUtils { /** * Value for 'id' attribute with prefix. */ - public static String CE_ID = ATTR_PREFIX + ID; + public static String CANONICAL_ID = ATTR_PREFIX + ID; /** * Value for 'source' attribute. @@ -83,7 +96,7 @@ public final class CloudEventMessageUtils { /** * Value for 'source' attribute with prefix. */ - public static String CE_SOURCE = ATTR_PREFIX + SOURCE; + public static String CANONICAL_SOURCE = ATTR_PREFIX + SOURCE; /** * Value for 'specversion' attribute. @@ -93,7 +106,7 @@ public final class CloudEventMessageUtils { /** * Value for 'specversion' attribute with prefix. */ - public static String CE_SPECVERSION = ATTR_PREFIX + SPECVERSION; + public static String CANONICAL_SPECVERSION = ATTR_PREFIX + SPECVERSION; /** * Value for 'type' attribute. @@ -103,7 +116,7 @@ public final class CloudEventMessageUtils { /** * Value for 'type' attribute with prefix. */ - public static String CE_TYPE = ATTR_PREFIX + TYPE; + public static String CANONICAL_TYPE = ATTR_PREFIX + TYPE; /** * Value for 'datacontenttype' attribute. @@ -113,7 +126,7 @@ public final class CloudEventMessageUtils { /** * Value for 'datacontenttype' attribute with prefix. */ - public static String CE_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE; + public static String CANONICAL_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE; /** * Value for 'dataschema' attribute. @@ -123,7 +136,7 @@ public final class CloudEventMessageUtils { /** * Value for 'dataschema' attribute with prefix. */ - public static String CE_DATASCHEMA = ATTR_PREFIX + DATASCHEMA; + public static String CANONICAL_DATASCHEMA = ATTR_PREFIX + DATASCHEMA; /** * Value for 'subject' attribute. @@ -133,7 +146,7 @@ public final class CloudEventMessageUtils { /** * Value for 'subject' attribute with prefix. */ - public static String CE_SUBJECT = ATTR_PREFIX + SUBJECT; + public static String CANONICAL_SUBJECT = ATTR_PREFIX + SUBJECT; /** * Value for 'time' attribute. @@ -143,68 +156,130 @@ public final class CloudEventMessageUtils { /** * Value for 'time' attribute with prefix. */ - public static String CE_TIME = ATTR_PREFIX + TIME; + public static String CANONICAL_TIME = ATTR_PREFIX + TIME; /** * Checks if {@link Message} represents cloud event in binary-mode. */ public static boolean isBinary(Map headers) { - return (headers.containsKey(ID) - && headers.containsKey(SOURCE) - && headers.containsKey(SPECVERSION) - && headers.containsKey(TYPE)) - || - (headers.containsKey(CE_ID) - && headers.containsKey(CE_SOURCE) - && headers.containsKey(CE_SPECVERSION) - && headers.containsKey(CE_TYPE)); + CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(headers); + return attributes.isValidCloudEvent(); } - /** - * Will construct instance of {@link CloudEventAttributes} setting its required attributes. + * Will construct instance of {@link CloudEventAttributesHelper} setting its required attributes. * * @param ce_id value for Cloud Event 'id' attribute * @param ce_specversion value for Cloud Event 'specversion' attribute * @param ce_source value for Cloud Event 'source' attribute * @param ce_type value for Cloud Event 'type' attribute - * @return instance of {@link CloudEventAttributes} + * @return instance of {@link CloudEventAttributesHelper} */ - public static CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) { + public static CloudEventAttributesHelper get(String ce_id, String ce_specversion, String ce_source, String ce_type) { Assert.hasText(ce_id, "'ce_id' must not be null or empty"); Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty"); Assert.hasText(ce_source, "'ce_source' must not be null or empty"); Assert.hasText(ce_type, "'ce_type' must not be null or empty"); Map requiredAttributes = new HashMap<>(); - requiredAttributes.put(CloudEventMessageUtils.CE_ID, ce_id); - requiredAttributes.put(CloudEventMessageUtils.CE_SPECVERSION, ce_specversion); - requiredAttributes.put(CloudEventMessageUtils.CE_SOURCE, ce_source); - requiredAttributes.put(CloudEventMessageUtils.CE_TYPE, ce_type); - return new CloudEventAttributes(requiredAttributes); + requiredAttributes.put(CloudEventMessageUtils.CANONICAL_ID, ce_id); + requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SPECVERSION, ce_specversion); + requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SOURCE, ce_source); + requiredAttributes.put(CloudEventMessageUtils.CANONICAL_TYPE, ce_type); + return new CloudEventAttributesHelper(requiredAttributes); } /** - * Will construct instance of {@link CloudEventAttributes} + * Will construct instance of {@link CloudEventAttributesHelper} * Should default/generate cloud event ID and SPECVERSION. * * @param ce_source value for Cloud Event 'source' attribute * @param ce_type value for Cloud Event 'type' attribute - * @return instance of {@link CloudEventAttributes} + * @return instance of {@link CloudEventAttributesHelper} */ - public static CloudEventAttributes get(String ce_source, String ce_type) { + public static CloudEventAttributesHelper get(String ce_source, String ce_type) { return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type); } /** - * Will construct instance of {@link CloudEventAttributes} from {@link MessageHeaders}. + * Will construct instance of {@link CloudEventAttributesHelper} from {@link MessageHeaders}. * - * Should copy Cloud Event related headers into an instance of {@link CloudEventAttributes} + * Should copy Cloud Event related headers into an instance of {@link CloudEventAttributesHelper} * NOTE: Certain headers must not be copied. * * @param headers instance of {@link MessageHeaders} - * @return modifiable instance of {@link CloudEventAttributes} + * @return modifiable instance of {@link CloudEventAttributesHelper} */ public static RequiredAttributeAccessor get(MessageHeaders headers) { return new RequiredAttributeAccessor(headers); } + + + @SuppressWarnings("unchecked") + public static Message toBinary(Message inputMessage, MessageConverter messageConverter) { + + Map headers = inputMessage.getHeaders(); + CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(headers); + + // first check the obvious and see if content-type is `cloudevents` + if (!attributes.isValidCloudEvent() && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { + MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders()); + if (contentType.getType().equals(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType()) + && contentType.getSubtype().startsWith(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype())) { + + String dataContentType = StringUtils.hasText(attributes.getDataContentType()) + ? attributes.getDataContentType() + : MimeTypeUtils.APPLICATION_JSON_VALUE; + + String suffix = contentType.getSubtypeSuffix(); + MimeType cloudEventDeserializationContentType = MimeTypeUtils + .parseMimeType(contentType.getType() + "/" + suffix); + Message cloudEventMessage = MessageBuilder.fromMessage(inputMessage) + .setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType) + .setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType).build(); + Map structuredCloudEvent = (Map) messageConverter.fromMessage(cloudEventMessage, Map.class); + Message binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, determinePrefixToUse(inputMessage)); + return binaryCeMessage; + } + } + else if (StringUtils.hasText(attributes.getDataContentType())) { + return MessageBuilder.fromMessage(inputMessage) + .setHeader(MessageHeaders.CONTENT_TYPE, attributes.getDataContentType()) + .build(); + } + return inputMessage; + } + + private static Message buildCeMessageFromStructured(Map structuredCloudEvent, String prefixToUse) { + Object data = null; + if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) { + data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA); + structuredCloudEvent.remove(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA); + } + else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.CANONICAL_DATA)) { + data = structuredCloudEvent.get(CloudEventMessageUtils.CANONICAL_DATA); + structuredCloudEvent.remove(CloudEventMessageUtils.CANONICAL_DATA); + } + else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) { + data = structuredCloudEvent.get(CloudEventMessageUtils.DATA); + structuredCloudEvent.remove(CloudEventMessageUtils.DATA); + } + Assert.notNull(data, "'data' must not be null"); + MessageBuilder builder = MessageBuilder.withPayload(data); + CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(structuredCloudEvent); + builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId()); + builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource()); + builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType()); + builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion()); + return builder.build(); + } + + public static String determinePrefixToUse(Message inputMessage) { + Set keys = inputMessage.getHeaders().keySet(); + if (keys.contains("user-agent")) { + return CloudEventMessageUtils.HTTP_ATTR_PREFIX; + } + else { + return CloudEventMessageUtils.ATTR_PREFIX; + } + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java index 8e7216b83..d49627c54 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java @@ -41,9 +41,10 @@ public class DefaultCloudEventAttributesProvider implements CloudEventAttributes @Override public Map generateDefaultCloudEventHeaders(Message inputMessage, Object result) { - if (inputMessage.getHeaders().containsKey(CloudEventMessageUtils.CE_ID)) { // input is a cloud event + RequiredAttributeAccessor attributes = new RequiredAttributeAccessor(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage)); + if (attributes.isValidCloudEvent()) { String applicationName = this.getApplicationName(); - return CloudEventMessageUtils.get(inputMessage.getHeaders()) + return attributes .setId(UUID.randomUUID().toString()) .setType(result.getClass().getName()) .setSource(applicationName); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java index c9b440188..501610de8 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java @@ -18,39 +18,68 @@ package org.springframework.cloud.function.cloudevent; import java.util.Map; +import org.springframework.util.StringUtils; + /** * * @author Oleg Zhurakousky * @since 3.1 */ -public class RequiredAttributeAccessor extends CloudEventAttributes { +public class RequiredAttributeAccessor extends CloudEventAttributesHelper { + + private final String prefixToUse; /** * */ private static final long serialVersionUID = 859410409447601477L; - RequiredAttributeAccessor(Map headers) { + RequiredAttributeAccessor(Map headers, String prefixToUse) { super(headers); + this.prefixToUse = prefixToUse; + } + + RequiredAttributeAccessor(Map headers) { + this(headers, null); } public RequiredAttributeAccessor setId(String id) { - this.put(CloudEventMessageUtils.CE_ID, id); + if (StringUtils.hasText(this.prefixToUse)) { + this.put(this.prefixToUse + CloudEventMessageUtils.ID, id); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.ID), id); + } return this; } public RequiredAttributeAccessor setSource(String source) { - this.put(CloudEventMessageUtils.CE_SOURCE, source); + if (StringUtils.hasText(this.prefixToUse)) { + this.put(this.prefixToUse + CloudEventMessageUtils.SOURCE, source); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.SOURCE), source); + } return this; } public RequiredAttributeAccessor setSpecversion(String specversion) { - this.put(CloudEventMessageUtils.CE_SPECVERSION, specversion); + if (StringUtils.hasText(this.prefixToUse)) { + this.put(this.prefixToUse + CloudEventMessageUtils.SPECVERSION, specversion); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.SPECVERSION), specversion); + } return this; } public RequiredAttributeAccessor setType(String type) { - this.put(CloudEventMessageUtils.CE_TYPE, type); + if (StringUtils.hasText(this.prefixToUse)) { + this.put(this.prefixToUse + CloudEventMessageUtils.TYPE, type); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.TYPE), type); + } return this; } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index d4351fdd1..99f92421e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -47,6 +47,7 @@ import reactor.util.function.Tuples; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; @@ -821,6 +822,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect && !this.isInputTypeMessage()) { //TODO rework return null; } + + input = CloudEventMessageUtils.toBinary((Message) input, messageConverter); + convertedInput = this.convertInputMessageIfNecessary((Message) input, type); if (convertedInput == null) { // give ConversionService a chance convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 5a8335184..aed66e6e4 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -31,8 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; -import org.springframework.cloud.function.cloudevent.CloudEventDataContentTypeMessagePreProcessor; -import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter; import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; @@ -107,15 +105,12 @@ public class ContextFunctionCatalogAutoConfiguration { .collect(Collectors.toList()); mcList.add(new JsonMessageConverter(jsonMapper)); - mcList.add(new CloudEventJsonMessageConverter(jsonMapper)); mcList.add(new ByteArrayMessageConverter()); mcList.add(new StringMessageConverter()); mcList.add(new PrimitiveTypesFromStringMessageConverter(conversionService)); if (!CollectionUtils.isEmpty(mcList)) { messageConverter = new SmartCompositeMessageConverter(mcList); - CloudEventDataContentTypeMessagePreProcessor messagePreProcessor = new CloudEventDataContentTypeMessagePreProcessor(messageConverter); - messageConverter.setMessagePreProcessor(messagePreProcessor); } return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java index cb7a16798..68bf27209 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java @@ -20,6 +20,7 @@ import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.Collection; +import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -42,7 +43,8 @@ public class JsonMessageConverter extends AbstractMessageConverter { private final JsonMapper jsonMapper; public JsonMessageConverter(JsonMapper jsonMapper) { - this(jsonMapper, new MimeType("application", "json")); + this(jsonMapper, new MimeType("application", "json"), new MimeType(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType(), + CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype() + "+json")); } public JsonMessageConverter(JsonMapper jsonMapper, MimeType... supportedMimeTypes) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java index 2b256b533..7a12c2df9 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java @@ -18,7 +18,6 @@ package org.springframework.cloud.function.context.config; import java.util.Collection; import java.util.List; -import java.util.function.Function; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -38,8 +37,6 @@ import org.springframework.util.StringUtils; */ public class SmartCompositeMessageConverter extends CompositeMessageConverter { - private Function, Message> preProcessor; - public SmartCompositeMessageConverter(Collection converters) { super(converters); } @@ -47,9 +44,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { @Override @Nullable public Object fromMessage(Message message, Class targetClass) { - if (this.preProcessor != null) { - message = this.preProcessor.apply(message); - } for (MessageConverter converter : getConverters()) { Object result = converter.fromMessage(message, targetClass); if (result != null) { @@ -62,9 +56,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { @Override @Nullable public Object fromMessage(Message message, Class targetClass, @Nullable Object conversionHint) { - if (this.preProcessor != null) { - message = this.preProcessor.apply(message); - } for (MessageConverter converter : getConverters()) { Object result = (converter instanceof SmartMessageConverter ? ((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) : @@ -76,7 +67,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { return null; } - @SuppressWarnings("unchecked") @Override @Nullable public Message toMessage(Object payload, @Nullable MessageHeaders headers) { @@ -142,8 +132,4 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { } return null; } - - public void setMessagePreProcessor(Function, Message> preProcessor) { - this.preProcessor = preProcessor; - } } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java index d77f694a8..83b20f88e 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java @@ -45,7 +45,7 @@ public class CloudEventTypeConversionTests { @Test public void testFromMessageBinaryPayloadMatchesType() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); ceAttributes.setDataContentType("text/plain"); Message message = MessageBuilder.withPayload("Hello Ricky").copyHeaders(ceAttributes).build(); @@ -57,7 +57,7 @@ public class CloudEventTypeConversionTests { @Test public void testFromMessageBinaryPayloadDoesNotMatchType() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) @@ -71,7 +71,7 @@ public class CloudEventTypeConversionTests { @Test // JsonMessageConverter does some special things between byte[] and String so this works public void testFromMessageBinaryPayloadNoDataContentTypeToString() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) @@ -85,7 +85,7 @@ public class CloudEventTypeConversionTests { @Test // Unlike the previous test the type here is POJO so no special treatement public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) .setHeader(MessageHeaders.CONTENT_TYPE, @@ -98,7 +98,7 @@ public class CloudEventTypeConversionTests { @Test // will fall on default CT which is json public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes()) .copyHeaders(ceAttributes) .setHeader(MessageHeaders.CONTENT_TYPE, @@ -108,30 +108,6 @@ public class CloudEventTypeConversionTests { assertThat(converted.getName()).isEqualTo("Ricky"); } - @Test // will fall on default CT which is json - public void testFromMessageStructured() { - String cloudEventStructured = "{\n" + - " \"specversion\" : \"1.0\",\n" + - " \"type\" : \"org.springframework\",\n" + - " \"source\" : \"https://spring.io/\",\n" + - " \"id\" : \"A234-1234-1234\",\n" + - " \"datacontenttype\" : \"application/json\",\n" + - " \"data\" : {\n" + - " \"version\" : \"1.0\",\n" + - " \"releaseName\" : \"Spring Framework\",\n" + - " \"releaseDate\" : \"24-03-2004\"\n" + - " }\n" + - " }"; - SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - Message message = MessageBuilder.withPayload(cloudEventStructured) - .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") - .setHeader(CloudEventMessageUtils.CE_DATACONTENTTYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build(); - SpringReleaseEvent springReleaseEvent = (SpringReleaseEvent) messageConverter.fromMessage(message, - SpringReleaseEvent.class); - assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework"); - assertThat(springReleaseEvent.getVersion()).isEqualTo("1.0"); - } - private SmartCompositeMessageConverter configure(Class... configClass) { ApplicationContext context = new SpringApplicationBuilder(configClass).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.main.lazy-initialization=true"); diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index c23b0fab8..6e5cb1156 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -21,7 +21,7 @@ import java.util.function.Function; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.function.cloudevent.CloudEventAttributes; +import org.springframework.cloud.function.cloudevent.CloudEventAttributesHelper; import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.context.annotation.Bean; @@ -90,7 +90,7 @@ public class CloudeventDemoApplication { data.setVersion("2.0"); data.setReleaseDateAsString("01-10-2006"); - CloudEventAttributes ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders()) + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders()) .setSource("https://interface21.com/") .setType("com.interface21"); diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index 3e2602afd..c841cff80 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -22,7 +22,6 @@ import java.net.URI; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -30,10 +29,7 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.SpringApplication; import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; -import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; -import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; @@ -49,7 +45,6 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.AbstractMessageConverter; import org.springframework.messaging.converter.MessageConverter; import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; import org.springframework.util.SocketUtils; /** @@ -219,9 +214,9 @@ public class CloudeventDemoApplicationRESTTests { ResponseEntity response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2050\",\"releaseName\":\"Spring Framework\",\"version\":\"10.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("http://spring.io/application-application")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList(LinkedHashMap.class.getName())); } @@ -236,12 +231,40 @@ public class CloudeventDemoApplicationRESTTests { ResponseEntity response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("http://spring.io/application-application")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); } + + /* + * Typically this would never happen since spec mandates that HTTP uses 'ce-` prefix. + * So this is to primarily validate that we can recognize it process it and still produce correct headers + */ + @Test + public void testAsBinaryPojoToPojoWrongHeaders() throws Exception { + SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString()); + headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/"); + headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0"); + headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework"); + String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; + + RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo")); + ResponseEntity response = testRestTemplate.exchange(re, String.class); + + assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}"); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + .isEqualTo(Collections.singletonList("http://spring.io/application-application")); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); + } + + @Test public void testAsStructuralPojoToPojo() throws Exception { ApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class); @@ -281,9 +304,9 @@ public class CloudeventDemoApplicationRESTTests { assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0"); -// assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE)) +// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_SOURCE)) // .isEqualTo(Collections.singletonList("http://spring.io/application-application")); -// assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE)) +// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_TYPE)) // .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); } @@ -294,10 +317,10 @@ public class CloudeventDemoApplicationRESTTests { private HttpHeaders buildHeaders(MediaType contentType) { HttpHeaders headers = new HttpHeaders(); headers.setContentType(contentType); - headers.set(CloudEventMessageUtils.CE_ID, UUID.randomUUID().toString()); - headers.set(CloudEventMessageUtils.CE_SOURCE, "https://spring.io/"); - headers.set(CloudEventMessageUtils.CE_SPECVERSION, "1.0"); - headers.set(CloudEventMessageUtils.CE_TYPE, "org.springframework"); + headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString()); + headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/"); + headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0"); + headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework"); return headers; }