diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 790cb2388..5cd9f04eb 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -275,9 +275,13 @@ public final class CloudEventMessageUtils { * @return prefix (e.g., 'ce_' or 'ce-' etc.) */ static String determinePrefixToUse(Map messageHeaders) { + return determinePrefixToUse(messageHeaders, false); + } + + static String determinePrefixToUse(Map messageHeaders, boolean strict) { String targetProtocol = (String) messageHeaders.get(MessageUtils.TARGET_PROTOCOL); String prefix = determinePrefixToUse(targetProtocol); - if (StringUtils.hasText(prefix)) { + if (StringUtils.hasText(prefix) && (strict || StringUtils.hasText((String) messageHeaders.get(prefix + _SPECVERSION)))) { return prefix; } else { @@ -294,7 +298,7 @@ public final class CloudEventMessageUtils { } } - return ""; + return DEFAULT_ATTR_PREFIX; } /** diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java index 32643c70e..4e29b1c7e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -90,7 +90,7 @@ public class CloudEventsFunctionInvocationHelper implements FunctionInvocationHe if (this.messageConverter != null && CLOUD_EVENT_CLASS != null && CLOUD_EVENT_CLASS.isAssignableFrom(result.getClass())) { convertedResult = this.messageConverter.toMessage(result, input.getHeaders()); } - String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders()); + String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders(), true); return this.doPostProcessResult(convertedResult, targetPrefix); } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java index 65d298f97..4745e2f2a 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java @@ -31,6 +31,26 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class CloudEventMessageUtilsAndBuilderTests { + @Test// see https://github.com/spring-cloud/spring-cloud-function/issues/680 + public void testProperAttributeExtractionRegardlessOfTargetProtocol() { + Message ceMessage = CloudEventMessageBuilder.withData("foo").build(); + ceMessage = MessageBuilder.fromMessage(ceMessage).setHeader("target-protocol", "kafka").build(); + + String prefix = CloudEventMessageUtils.determinePrefixToUse(ceMessage.getHeaders()); + assertThat(prefix).isEqualTo("ce-"); + prefix = CloudEventMessageUtils.determinePrefixToUse(ceMessage.getHeaders(), true); + assertThat(prefix).isEqualTo("ce_"); + + String specVersion = CloudEventMessageUtils.getSpecVersion(ceMessage); + assertThat(specVersion).isEqualTo("1.0"); + String type = CloudEventMessageUtils.getType(ceMessage); + assertThat(type).isEqualTo("java.lang.String"); + String id = CloudEventMessageUtils.getId(ceMessage); + assertThat(id).isNotNull(); + URI source = CloudEventMessageUtils.getSource(ceMessage); + assertThat(source.toString()).isEqualTo("https://spring.io/"); + } + @Test public void testAttributeRecognitionAndCanonicalization() { Message httpMessage = MessageBuilder.withPayload("hello")