diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 55ab9a49a..552ce4f12 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -32,6 +32,7 @@ import org.springframework.messaging.converter.ContentTypeResolver; import org.springframework.messaging.converter.DefaultContentTypeResolver; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.StringUtils; @@ -228,15 +229,16 @@ public final class CloudEventMessageUtils { static Message toCanonical(Message inputMessage, MessageConverter messageConverter) { Map headers = new HashMap<>(inputMessage.getHeaders()); canonicalizeHeaders(headers, false); - if (isCloudEvent(inputMessage) && headers.containsKey("content-type")) { + boolean isCloudEvent = isCloudEvent(inputMessage); + if (isCloudEvent && headers.containsKey("content-type")) { inputMessage = MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, headers.get("content-type")).build(); } String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE); + MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders()); // first check the obvious and see if content-type is `cloudevents` - if (!isCloudEvent(inputMessage) && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { + if (!isCloudEvent && contentType != null) { // structured-mode - MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders()); if (contentType.getType().equals(APPLICATION_CLOUDEVENTS.getType()) && contentType .getSubtype().startsWith(APPLICATION_CLOUDEVENTS.getSubtype())) { @@ -251,7 +253,7 @@ public final class CloudEventMessageUtils { .setHeader(DATACONTENTTYPE, dataContentType).build(); Map structuredCloudEvent = (Map) messageConverter .fromMessage(cloudEventMessage, Map.class); - + Assert.notEmpty(structuredCloudEvent, "Failed to convert CloudEvent from structured mode"); canonicalizeHeaders(structuredCloudEvent, true); return buildBinaryMessageFromStructuredMap(structuredCloudEvent, inputMessage.getHeaders()); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java index 388b0f057..0ad602b4e 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java @@ -391,6 +391,7 @@ public class CloudEventFunctionTests { Function, Message> springReleaseAsMessage() { return message -> { SpringReleaseEvent updated = springRelease().apply(message.getPayload()); + assertThat(message.getHeaders().get("ce-type")).isEqualTo("org.springframework"); return CloudEventMessageBuilder.withData(updated) .copyHeaders(message.getHeaders()) .setSource("https://spring.release.event")