diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java deleted file mode 100644 index 0b534db81..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributes.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import java.util.HashMap; -import java.util.Map; - - -/** - * - * @author Oleg Zhurakousky - * @since 3.1 - */ -public class CloudEventAttributes extends HashMap { - - /** - * - */ - private static final long serialVersionUID = 5393610770855366497L; - - - - CloudEventAttributes(Map headers) { - super(headers); - } - - @SuppressWarnings("unchecked") - public A getId() { - return this.containsKey(CloudEventMessageUtils.CE_ID) - ? (A) this.get(CloudEventMessageUtils.CE_ID) - : (A) this.get(CloudEventMessageUtils.ID); - } - - @SuppressWarnings("unchecked") - public A getSource() { - return this.containsKey(CloudEventMessageUtils.CE_SOURCE) - ? (A) this.get(CloudEventMessageUtils.CE_SOURCE) - : (A) this.get(CloudEventMessageUtils.SOURCE); - } - - @SuppressWarnings("unchecked") - public A getSpecversion() { - return this.containsKey(CloudEventMessageUtils.CE_SPECVERSION) - ? (A) this.get(CloudEventMessageUtils.CE_SPECVERSION) - : (A) this.get(CloudEventMessageUtils.SPECVERSION); - } - - @SuppressWarnings("unchecked") - public A getType() { - return this.containsKey(CloudEventMessageUtils.CE_TYPE) - ? (A) this.get(CloudEventMessageUtils.CE_TYPE) - : (A) this.get(CloudEventMessageUtils.TYPE); - } - - @SuppressWarnings("unchecked") - public A getDataContentType() { - return this.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE) - ? (A) this.get(CloudEventMessageUtils.CE_DATACONTENTTYPE) - : (A) this.get(CloudEventMessageUtils.DATACONTENTTYPE); - } - - public void setDataContentType(String datacontenttype) { - this.put(CloudEventMessageUtils.CE_DATACONTENTTYPE, datacontenttype); - } - - @SuppressWarnings("unchecked") - public A getAtttribute(String name) { - return (A) this.get(name); - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java new file mode 100644 index 000000000..0050c2df8 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesHelper.java @@ -0,0 +1,125 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.cloudevent; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.util.StringUtils; + + +/** + * + * @author Oleg Zhurakousky + * @since 3.1 + */ +public class CloudEventAttributesHelper extends HashMap { + + /** + * + */ + private static final long serialVersionUID = 5393610770855366497L; + + + + CloudEventAttributesHelper(Map headers) { + super(headers); + } + + @SuppressWarnings("unchecked") + public A getId() { + if (this.containsKey(CloudEventMessageUtils.CANONICAL_ID)) { + return (A) this.get(CloudEventMessageUtils.CANONICAL_ID); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)) { + return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID); + } + return null; + } + + String getAttributeName(String attributeName) { + if (this.containsKey(CloudEventMessageUtils.ATTR_PREFIX + attributeName)) { + return CloudEventMessageUtils.ATTR_PREFIX + attributeName; + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName)) { + return CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName; + } + return attributeName; + } + + @SuppressWarnings("unchecked") + public A getSource() { + if (this.containsKey(CloudEventMessageUtils.CANONICAL_SOURCE)) { + return (A) this.get(CloudEventMessageUtils.CANONICAL_SOURCE); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) { + return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE); + } + return (A) this.get(CloudEventMessageUtils.SOURCE); + } + + @SuppressWarnings("unchecked") + public A getSpecversion() { + if (this.containsKey(CloudEventMessageUtils.CANONICAL_SPECVERSION)) { + return (A) this.get(CloudEventMessageUtils.CANONICAL_SPECVERSION); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION)) { + return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION); + } + return (A) this.get(CloudEventMessageUtils.SPECVERSION); + } + + @SuppressWarnings("unchecked") + public A getType() { + if (this.containsKey(CloudEventMessageUtils.CANONICAL_TYPE)) { + return (A) this.get(CloudEventMessageUtils.CANONICAL_TYPE); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) { + return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE); + } + return (A) this.get(CloudEventMessageUtils.TYPE); + } + + @SuppressWarnings("unchecked") + public A getDataContentType() { + Object dataContentType; + if (this.containsKey(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE)) { + dataContentType = this.get(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE); + } + else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATACONTENTTYPE)) { + dataContentType = this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATACONTENTTYPE); + } + dataContentType = this.get(CloudEventMessageUtils.DATACONTENTTYPE); + return (A) dataContentType; + } + + public void setDataContentType(String datacontenttype) { + this.put(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, datacontenttype); + } + + @SuppressWarnings("unchecked") + public A getAtttribute(String name) { + return (A) this.get(name); + } + + public boolean isValidCloudEvent() { + return StringUtils.hasText(this.getId()) + && StringUtils.hasText(this.getSource()) + && StringUtils.hasText(this.getSpecversion()) + && StringUtils.hasText(this.getType()); + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java index 5e924d397..baa309604 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java @@ -31,7 +31,7 @@ public interface CloudEventAttributesProvider { * * @param inputMessage input message used to invoke user functionality (e.g., function) * @param result result of the invocation of user functionality (e.g., function) - * @return instance of {@link CloudEventAttributes} + * @return instance of {@link CloudEventAttributesHelper} */ Map generateDefaultCloudEventHeaders(Message inputMessage, Object result); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java deleted file mode 100644 index 531da48c9..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import java.util.Map; -import java.util.function.Function; - -import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.converter.CompositeMessageConverter; -import org.springframework.messaging.converter.ContentTypeResolver; -import org.springframework.messaging.converter.DefaultContentTypeResolver; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.Assert; -import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; - -/** - * A Cloud Events specific pre-processor that is added to {@link SmartCompositeMessageConverter} - * to potentially modify incoming message. - *

- * For Cloud Event coming in binary-mode such modification implies determining - * content type of the 'data' attribute (see {@link #getDataContentType(MessageHeaders)} - * of Cloud Event and creating a new {@link Message} with its `contentType` set to such - * content type while copying the rest of the headers. - *

- * Similar to Cloud Event coming in binary-mode, the Cloud Event coming in structured-mode - * such modification also implies determining content type of the 'data' attribute - * (see {@link #getDataContentType(MessageHeaders)}... - * - * @author Oleg Zhurakousky - * @since 3.1 - */ -public class CloudEventDataContentTypeMessagePreProcessor implements Function, Message> { - - private final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver(); - - private final MimeType cloudEventContentType = CloudEventMessageUtils.APPLICATION_CLOUDEVENTS; - - private final CompositeMessageConverter messageConverter; - - public CloudEventDataContentTypeMessagePreProcessor(CompositeMessageConverter messageConverter) { - Assert.notNull(messageConverter, "'messageConverter' must not be null"); - this.messageConverter = messageConverter; - } - - @SuppressWarnings("unchecked") - @Override - public Message apply(Message inputMessage) { - if (CloudEventMessageUtils.isBinary(inputMessage.getHeaders())) { - String dataContentType = this.getDataContentType(inputMessage.getHeaders()); - Message message = MessageBuilder.fromMessage(inputMessage) - .setHeader(MessageHeaders.CONTENT_TYPE, dataContentType) -// .setHeader("originalContentType", inputMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)) not sure about it - .build(); - return message; - } - else if (this.isStructured(inputMessage)) { - MimeType contentType = this.contentTypeResolver.resolve(inputMessage.getHeaders()); - String dataContentType = this.getDataContentType(inputMessage.getHeaders()); - String suffix = contentType.getSubtypeSuffix(); - MimeType cloudEventDeserializationContentType = MimeTypeUtils - .parseMimeType(contentType.getType() + "/" + suffix); - Message cloudEventMessage = MessageBuilder.fromMessage(inputMessage) - .setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType) - .setHeader(CloudEventMessageUtils.CE_DATACONTENTTYPE, dataContentType).build(); - Map structuredCloudEvent = (Map) this.messageConverter - .fromMessage(cloudEventMessage, Map.class); - Message binaryCeMessage = this.buildCeMessageFromStructured(structuredCloudEvent); - return binaryCeMessage; - } - else { - return inputMessage; - } - } - - private Message buildCeMessageFromStructured(Map structuredCloudEvent) { - MessageBuilder builder = MessageBuilder.withPayload( - structuredCloudEvent.containsKey(CloudEventMessageUtils.CE_DATA) - ? structuredCloudEvent.get(CloudEventMessageUtils.CE_DATA) - : structuredCloudEvent.get(CloudEventMessageUtils.DATA)); - structuredCloudEvent.remove(CloudEventMessageUtils.CE_DATA); - structuredCloudEvent.remove(CloudEventMessageUtils.DATA); - builder.copyHeaders(structuredCloudEvent); - return builder.build(); - } - - private String getDataContentType(MessageHeaders headers) { - if (headers.containsKey(CloudEventMessageUtils.DATACONTENTTYPE)) { - return (String) headers.get(CloudEventMessageUtils.DATACONTENTTYPE); - } - else if (headers.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE)) { - return (String) headers.get(CloudEventMessageUtils.CE_DATACONTENTTYPE); - } - else if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) { - return headers.get(MessageHeaders.CONTENT_TYPE).toString(); - } - return MimeTypeUtils.APPLICATION_JSON_VALUE; - } - - private boolean isStructured(Message message) { - if (!CloudEventMessageUtils.isBinary(message.getHeaders())) { - Map headers = message.getHeaders(); - - if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) { - MimeType contentType = this.contentTypeResolver.resolve(message.getHeaders()); - if (contentType.getType().equals(this.cloudEventContentType.getType()) - && contentType.getSubtype().startsWith(this.cloudEventContentType.getSubtype())) { - return true; - } - } - } - return false; - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java deleted file mode 100644 index 1ab79e861..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventJsonMessageConverter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2020-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.cloudevent; - -import org.springframework.cloud.function.context.config.JsonMessageConverter; -import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.messaging.converter.MessageConverter; -import org.springframework.util.MimeType; - -/** - * Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the - * actual conversion via {@link JsonMapper} instance. - * - * @author Oleg Zhurakousky - * - * @since 3.1 - */ -public class CloudEventJsonMessageConverter extends JsonMessageConverter { - - public CloudEventJsonMessageConverter(JsonMapper jsonMapper) { - super(jsonMapper, new MimeType(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType(), - CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype() + "+json")); - this.setStrictContentTypeMatch(true); - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 9ecd1c9fa..daf4d76b5 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -18,13 +18,19 @@ package org.springframework.cloud.function.cloudevent; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.ContentTypeResolver; +import org.springframework.messaging.converter.DefaultContentTypeResolver; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StringUtils; /** * Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/. @@ -36,6 +42,8 @@ import org.springframework.util.MimeTypeUtils; */ public final class CloudEventMessageUtils { + private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver(); + private CloudEventMessageUtils() { } @@ -53,7 +61,12 @@ public final class CloudEventMessageUtils { /** * Prefix for attributes. */ - public static String ATTR_PREFIX = "ce-"; + public static String ATTR_PREFIX = "ce_"; + + /** + * Prefix for attributes. + */ + public static String HTTP_ATTR_PREFIX = "ce-"; /** * Value for 'data' attribute. @@ -63,7 +76,7 @@ public final class CloudEventMessageUtils { /** * Value for 'data' attribute with prefix. */ - public static String CE_DATA = ATTR_PREFIX + DATA; + public static String CANONICAL_DATA = ATTR_PREFIX + DATA; /** * Value for 'id' attribute. @@ -73,7 +86,7 @@ public final class CloudEventMessageUtils { /** * Value for 'id' attribute with prefix. */ - public static String CE_ID = ATTR_PREFIX + ID; + public static String CANONICAL_ID = ATTR_PREFIX + ID; /** * Value for 'source' attribute. @@ -83,7 +96,7 @@ public final class CloudEventMessageUtils { /** * Value for 'source' attribute with prefix. */ - public static String CE_SOURCE = ATTR_PREFIX + SOURCE; + public static String CANONICAL_SOURCE = ATTR_PREFIX + SOURCE; /** * Value for 'specversion' attribute. @@ -93,7 +106,7 @@ public final class CloudEventMessageUtils { /** * Value for 'specversion' attribute with prefix. */ - public static String CE_SPECVERSION = ATTR_PREFIX + SPECVERSION; + public static String CANONICAL_SPECVERSION = ATTR_PREFIX + SPECVERSION; /** * Value for 'type' attribute. @@ -103,7 +116,7 @@ public final class CloudEventMessageUtils { /** * Value for 'type' attribute with prefix. */ - public static String CE_TYPE = ATTR_PREFIX + TYPE; + public static String CANONICAL_TYPE = ATTR_PREFIX + TYPE; /** * Value for 'datacontenttype' attribute. @@ -113,7 +126,7 @@ public final class CloudEventMessageUtils { /** * Value for 'datacontenttype' attribute with prefix. */ - public static String CE_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE; + public static String CANONICAL_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE; /** * Value for 'dataschema' attribute. @@ -123,7 +136,7 @@ public final class CloudEventMessageUtils { /** * Value for 'dataschema' attribute with prefix. */ - public static String CE_DATASCHEMA = ATTR_PREFIX + DATASCHEMA; + public static String CANONICAL_DATASCHEMA = ATTR_PREFIX + DATASCHEMA; /** * Value for 'subject' attribute. @@ -133,7 +146,7 @@ public final class CloudEventMessageUtils { /** * Value for 'subject' attribute with prefix. */ - public static String CE_SUBJECT = ATTR_PREFIX + SUBJECT; + public static String CANONICAL_SUBJECT = ATTR_PREFIX + SUBJECT; /** * Value for 'time' attribute. @@ -143,68 +156,130 @@ public final class CloudEventMessageUtils { /** * Value for 'time' attribute with prefix. */ - public static String CE_TIME = ATTR_PREFIX + TIME; + public static String CANONICAL_TIME = ATTR_PREFIX + TIME; /** * Checks if {@link Message} represents cloud event in binary-mode. */ public static boolean isBinary(Map headers) { - return (headers.containsKey(ID) - && headers.containsKey(SOURCE) - && headers.containsKey(SPECVERSION) - && headers.containsKey(TYPE)) - || - (headers.containsKey(CE_ID) - && headers.containsKey(CE_SOURCE) - && headers.containsKey(CE_SPECVERSION) - && headers.containsKey(CE_TYPE)); + CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(headers); + return attributes.isValidCloudEvent(); } - /** - * Will construct instance of {@link CloudEventAttributes} setting its required attributes. + * Will construct instance of {@link CloudEventAttributesHelper} setting its required attributes. * * @param ce_id value for Cloud Event 'id' attribute * @param ce_specversion value for Cloud Event 'specversion' attribute * @param ce_source value for Cloud Event 'source' attribute * @param ce_type value for Cloud Event 'type' attribute - * @return instance of {@link CloudEventAttributes} + * @return instance of {@link CloudEventAttributesHelper} */ - public static CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) { + public static CloudEventAttributesHelper get(String ce_id, String ce_specversion, String ce_source, String ce_type) { Assert.hasText(ce_id, "'ce_id' must not be null or empty"); Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty"); Assert.hasText(ce_source, "'ce_source' must not be null or empty"); Assert.hasText(ce_type, "'ce_type' must not be null or empty"); Map requiredAttributes = new HashMap<>(); - requiredAttributes.put(CloudEventMessageUtils.CE_ID, ce_id); - requiredAttributes.put(CloudEventMessageUtils.CE_SPECVERSION, ce_specversion); - requiredAttributes.put(CloudEventMessageUtils.CE_SOURCE, ce_source); - requiredAttributes.put(CloudEventMessageUtils.CE_TYPE, ce_type); - return new CloudEventAttributes(requiredAttributes); + requiredAttributes.put(CloudEventMessageUtils.CANONICAL_ID, ce_id); + requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SPECVERSION, ce_specversion); + requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SOURCE, ce_source); + requiredAttributes.put(CloudEventMessageUtils.CANONICAL_TYPE, ce_type); + return new CloudEventAttributesHelper(requiredAttributes); } /** - * Will construct instance of {@link CloudEventAttributes} + * Will construct instance of {@link CloudEventAttributesHelper} * Should default/generate cloud event ID and SPECVERSION. * * @param ce_source value for Cloud Event 'source' attribute * @param ce_type value for Cloud Event 'type' attribute - * @return instance of {@link CloudEventAttributes} + * @return instance of {@link CloudEventAttributesHelper} */ - public static CloudEventAttributes get(String ce_source, String ce_type) { + public static CloudEventAttributesHelper get(String ce_source, String ce_type) { return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type); } /** - * Will construct instance of {@link CloudEventAttributes} from {@link MessageHeaders}. + * Will construct instance of {@link CloudEventAttributesHelper} from {@link MessageHeaders}. * - * Should copy Cloud Event related headers into an instance of {@link CloudEventAttributes} + * Should copy Cloud Event related headers into an instance of {@link CloudEventAttributesHelper} * NOTE: Certain headers must not be copied. * * @param headers instance of {@link MessageHeaders} - * @return modifiable instance of {@link CloudEventAttributes} + * @return modifiable instance of {@link CloudEventAttributesHelper} */ public static RequiredAttributeAccessor get(MessageHeaders headers) { return new RequiredAttributeAccessor(headers); } + + + @SuppressWarnings("unchecked") + public static Message toBinary(Message inputMessage, MessageConverter messageConverter) { + + Map headers = inputMessage.getHeaders(); + CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(headers); + + // first check the obvious and see if content-type is `cloudevents` + if (!attributes.isValidCloudEvent() && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { + MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders()); + if (contentType.getType().equals(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType()) + && contentType.getSubtype().startsWith(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype())) { + + String dataContentType = StringUtils.hasText(attributes.getDataContentType()) + ? attributes.getDataContentType() + : MimeTypeUtils.APPLICATION_JSON_VALUE; + + String suffix = contentType.getSubtypeSuffix(); + MimeType cloudEventDeserializationContentType = MimeTypeUtils + .parseMimeType(contentType.getType() + "/" + suffix); + Message cloudEventMessage = MessageBuilder.fromMessage(inputMessage) + .setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType) + .setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType).build(); + Map structuredCloudEvent = (Map) messageConverter.fromMessage(cloudEventMessage, Map.class); + Message binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, determinePrefixToUse(inputMessage)); + return binaryCeMessage; + } + } + else if (StringUtils.hasText(attributes.getDataContentType())) { + return MessageBuilder.fromMessage(inputMessage) + .setHeader(MessageHeaders.CONTENT_TYPE, attributes.getDataContentType()) + .build(); + } + return inputMessage; + } + + private static Message buildCeMessageFromStructured(Map structuredCloudEvent, String prefixToUse) { + Object data = null; + if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) { + data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA); + structuredCloudEvent.remove(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA); + } + else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.CANONICAL_DATA)) { + data = structuredCloudEvent.get(CloudEventMessageUtils.CANONICAL_DATA); + structuredCloudEvent.remove(CloudEventMessageUtils.CANONICAL_DATA); + } + else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) { + data = structuredCloudEvent.get(CloudEventMessageUtils.DATA); + structuredCloudEvent.remove(CloudEventMessageUtils.DATA); + } + Assert.notNull(data, "'data' must not be null"); + MessageBuilder builder = MessageBuilder.withPayload(data); + CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(structuredCloudEvent); + builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId()); + builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource()); + builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType()); + builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion()); + return builder.build(); + } + + public static String determinePrefixToUse(Message inputMessage) { + Set keys = inputMessage.getHeaders().keySet(); + if (keys.contains("user-agent")) { + return CloudEventMessageUtils.HTTP_ATTR_PREFIX; + } + else { + return CloudEventMessageUtils.ATTR_PREFIX; + } + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java index 8e7216b83..d49627c54 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java @@ -41,9 +41,10 @@ public class DefaultCloudEventAttributesProvider implements CloudEventAttributes @Override public Map generateDefaultCloudEventHeaders(Message inputMessage, Object result) { - if (inputMessage.getHeaders().containsKey(CloudEventMessageUtils.CE_ID)) { // input is a cloud event + RequiredAttributeAccessor attributes = new RequiredAttributeAccessor(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage)); + if (attributes.isValidCloudEvent()) { String applicationName = this.getApplicationName(); - return CloudEventMessageUtils.get(inputMessage.getHeaders()) + return attributes .setId(UUID.randomUUID().toString()) .setType(result.getClass().getName()) .setSource(applicationName); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java index c9b440188..501610de8 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/RequiredAttributeAccessor.java @@ -18,39 +18,68 @@ package org.springframework.cloud.function.cloudevent; import java.util.Map; +import org.springframework.util.StringUtils; + /** * * @author Oleg Zhurakousky * @since 3.1 */ -public class RequiredAttributeAccessor extends CloudEventAttributes { +public class RequiredAttributeAccessor extends CloudEventAttributesHelper { + + private final String prefixToUse; /** * */ private static final long serialVersionUID = 859410409447601477L; - RequiredAttributeAccessor(Map headers) { + RequiredAttributeAccessor(Map headers, String prefixToUse) { super(headers); + this.prefixToUse = prefixToUse; + } + + RequiredAttributeAccessor(Map headers) { + this(headers, null); } public RequiredAttributeAccessor setId(String id) { - this.put(CloudEventMessageUtils.CE_ID, id); + if (StringUtils.hasText(this.prefixToUse)) { + this.put(this.prefixToUse + CloudEventMessageUtils.ID, id); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.ID), id); + } return this; } public RequiredAttributeAccessor setSource(String source) { - this.put(CloudEventMessageUtils.CE_SOURCE, source); + if (StringUtils.hasText(this.prefixToUse)) { + this.put(this.prefixToUse + CloudEventMessageUtils.SOURCE, source); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.SOURCE), source); + } return this; } public RequiredAttributeAccessor setSpecversion(String specversion) { - this.put(CloudEventMessageUtils.CE_SPECVERSION, specversion); + if (StringUtils.hasText(this.prefixToUse)) { + this.put(this.prefixToUse + CloudEventMessageUtils.SPECVERSION, specversion); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.SPECVERSION), specversion); + } return this; } public RequiredAttributeAccessor setType(String type) { - this.put(CloudEventMessageUtils.CE_TYPE, type); + if (StringUtils.hasText(this.prefixToUse)) { + this.put(this.prefixToUse + CloudEventMessageUtils.TYPE, type); + } + else { + this.put(this.getAttributeName(CloudEventMessageUtils.TYPE), type); + } return this; } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index d4351fdd1..99f92421e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -47,6 +47,7 @@ import reactor.util.function.Tuples; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; @@ -821,6 +822,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect && !this.isInputTypeMessage()) { //TODO rework return null; } + + input = CloudEventMessageUtils.toBinary((Message) input, messageConverter); + convertedInput = this.convertInputMessageIfNecessary((Message) input, type); if (convertedInput == null) { // give ConversionService a chance convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 5a8335184..aed66e6e4 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -31,8 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; -import org.springframework.cloud.function.cloudevent.CloudEventDataContentTypeMessagePreProcessor; -import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter; import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; @@ -107,15 +105,12 @@ public class ContextFunctionCatalogAutoConfiguration { .collect(Collectors.toList()); mcList.add(new JsonMessageConverter(jsonMapper)); - mcList.add(new CloudEventJsonMessageConverter(jsonMapper)); mcList.add(new ByteArrayMessageConverter()); mcList.add(new StringMessageConverter()); mcList.add(new PrimitiveTypesFromStringMessageConverter(conversionService)); if (!CollectionUtils.isEmpty(mcList)) { messageConverter = new SmartCompositeMessageConverter(mcList); - CloudEventDataContentTypeMessagePreProcessor messagePreProcessor = new CloudEventDataContentTypeMessagePreProcessor(messageConverter); - messageConverter.setMessagePreProcessor(messagePreProcessor); } return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java index cb7a16798..68bf27209 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java @@ -20,6 +20,7 @@ import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.Collection; +import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -42,7 +43,8 @@ public class JsonMessageConverter extends AbstractMessageConverter { private final JsonMapper jsonMapper; public JsonMessageConverter(JsonMapper jsonMapper) { - this(jsonMapper, new MimeType("application", "json")); + this(jsonMapper, new MimeType("application", "json"), new MimeType(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType(), + CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype() + "+json")); } public JsonMessageConverter(JsonMapper jsonMapper, MimeType... supportedMimeTypes) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java index 2b256b533..7a12c2df9 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java @@ -18,7 +18,6 @@ package org.springframework.cloud.function.context.config; import java.util.Collection; import java.util.List; -import java.util.function.Function; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -38,8 +37,6 @@ import org.springframework.util.StringUtils; */ public class SmartCompositeMessageConverter extends CompositeMessageConverter { - private Function, Message> preProcessor; - public SmartCompositeMessageConverter(Collection converters) { super(converters); } @@ -47,9 +44,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { @Override @Nullable public Object fromMessage(Message message, Class targetClass) { - if (this.preProcessor != null) { - message = this.preProcessor.apply(message); - } for (MessageConverter converter : getConverters()) { Object result = converter.fromMessage(message, targetClass); if (result != null) { @@ -62,9 +56,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { @Override @Nullable public Object fromMessage(Message message, Class targetClass, @Nullable Object conversionHint) { - if (this.preProcessor != null) { - message = this.preProcessor.apply(message); - } for (MessageConverter converter : getConverters()) { Object result = (converter instanceof SmartMessageConverter ? ((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) : @@ -76,7 +67,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { return null; } - @SuppressWarnings("unchecked") @Override @Nullable public Message toMessage(Object payload, @Nullable MessageHeaders headers) { @@ -142,8 +132,4 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { } return null; } - - public void setMessagePreProcessor(Function, Message> preProcessor) { - this.preProcessor = preProcessor; - } } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java index d77f694a8..83b20f88e 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java @@ -45,7 +45,7 @@ public class CloudEventTypeConversionTests { @Test public void testFromMessageBinaryPayloadMatchesType() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); ceAttributes.setDataContentType("text/plain"); Message message = MessageBuilder.withPayload("Hello Ricky").copyHeaders(ceAttributes).build(); @@ -57,7 +57,7 @@ public class CloudEventTypeConversionTests { @Test public void testFromMessageBinaryPayloadDoesNotMatchType() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) @@ -71,7 +71,7 @@ public class CloudEventTypeConversionTests { @Test // JsonMessageConverter does some special things between byte[] and String so this works public void testFromMessageBinaryPayloadNoDataContentTypeToString() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) @@ -85,7 +85,7 @@ public class CloudEventTypeConversionTests { @Test // Unlike the previous test the type here is POJO so no special treatement public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) .setHeader(MessageHeaders.CONTENT_TYPE, @@ -98,7 +98,7 @@ public class CloudEventTypeConversionTests { @Test // will fall on default CT which is json public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes()) .copyHeaders(ceAttributes) .setHeader(MessageHeaders.CONTENT_TYPE, @@ -108,30 +108,6 @@ public class CloudEventTypeConversionTests { assertThat(converted.getName()).isEqualTo("Ricky"); } - @Test // will fall on default CT which is json - public void testFromMessageStructured() { - String cloudEventStructured = "{\n" + - " \"specversion\" : \"1.0\",\n" + - " \"type\" : \"org.springframework\",\n" + - " \"source\" : \"https://spring.io/\",\n" + - " \"id\" : \"A234-1234-1234\",\n" + - " \"datacontenttype\" : \"application/json\",\n" + - " \"data\" : {\n" + - " \"version\" : \"1.0\",\n" + - " \"releaseName\" : \"Spring Framework\",\n" + - " \"releaseDate\" : \"24-03-2004\"\n" + - " }\n" + - " }"; - SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - Message message = MessageBuilder.withPayload(cloudEventStructured) - .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") - .setHeader(CloudEventMessageUtils.CE_DATACONTENTTYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build(); - SpringReleaseEvent springReleaseEvent = (SpringReleaseEvent) messageConverter.fromMessage(message, - SpringReleaseEvent.class); - assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework"); - assertThat(springReleaseEvent.getVersion()).isEqualTo("1.0"); - } - private SmartCompositeMessageConverter configure(Class... configClass) { ApplicationContext context = new SpringApplicationBuilder(configClass).run( "--logging.level.org.springframework.cloud.function=DEBUG", "--spring.main.lazy-initialization=true"); diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index c23b0fab8..6e5cb1156 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -21,7 +21,7 @@ import java.util.function.Function; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.function.cloudevent.CloudEventAttributes; +import org.springframework.cloud.function.cloudevent.CloudEventAttributesHelper; import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.context.annotation.Bean; @@ -90,7 +90,7 @@ public class CloudeventDemoApplication { data.setVersion("2.0"); data.setReleaseDateAsString("01-10-2006"); - CloudEventAttributes ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders()) + CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders()) .setSource("https://interface21.com/") .setType("com.interface21"); diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index 3e2602afd..c841cff80 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -22,7 +22,6 @@ import java.net.URI; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -30,10 +29,7 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.SpringApplication; import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; -import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter; import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; -import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; @@ -49,7 +45,6 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.AbstractMessageConverter; import org.springframework.messaging.converter.MessageConverter; import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; import org.springframework.util.SocketUtils; /** @@ -219,9 +214,9 @@ public class CloudeventDemoApplicationRESTTests { ResponseEntity response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2050\",\"releaseName\":\"Spring Framework\",\"version\":\"10.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("http://spring.io/application-application")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList(LinkedHashMap.class.getName())); } @@ -236,12 +231,40 @@ public class CloudeventDemoApplicationRESTTests { ResponseEntity response = testRestTemplate.exchange(re, String.class); assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}"); - assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) .isEqualTo(Collections.singletonList("http://spring.io/application-application")); - assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE)) + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); } + + /* + * Typically this would never happen since spec mandates that HTTP uses 'ce-` prefix. + * So this is to primarily validate that we can recognize it process it and still produce correct headers + */ + @Test + public void testAsBinaryPojoToPojoWrongHeaders() throws Exception { + SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {}); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString()); + headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/"); + headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0"); + headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework"); + String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; + + RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo")); + ResponseEntity response = testRestTemplate.exchange(re, String.class); + + assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}"); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) + .isEqualTo(Collections.singletonList("http://spring.io/application-application")); + assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) + .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); + } + + @Test public void testAsStructuralPojoToPojo() throws Exception { ApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class); @@ -281,9 +304,9 @@ public class CloudeventDemoApplicationRESTTests { assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0"); -// assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE)) +// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_SOURCE)) // .isEqualTo(Collections.singletonList("http://spring.io/application-application")); -// assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE)) +// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_TYPE)) // .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName())); } @@ -294,10 +317,10 @@ public class CloudeventDemoApplicationRESTTests { private HttpHeaders buildHeaders(MediaType contentType) { HttpHeaders headers = new HttpHeaders(); headers.setContentType(contentType); - headers.set(CloudEventMessageUtils.CE_ID, UUID.randomUUID().toString()); - headers.set(CloudEventMessageUtils.CE_SOURCE, "https://spring.io/"); - headers.set(CloudEventMessageUtils.CE_SPECVERSION, "1.0"); - headers.set(CloudEventMessageUtils.CE_TYPE, "org.springframework"); + headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString()); + headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/"); + headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0"); + headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework"); return headers; }