GH-680 Add support for strict (or not) dependency on target-protocol header for CE
Resolves #680
This commit is contained in:
@@ -275,9 +275,13 @@ public final class CloudEventMessageUtils {
|
||||
* @return prefix (e.g., 'ce_' or 'ce-' etc.)
|
||||
*/
|
||||
static String determinePrefixToUse(Map<String, Object> messageHeaders) {
|
||||
return determinePrefixToUse(messageHeaders, false);
|
||||
}
|
||||
|
||||
static String determinePrefixToUse(Map<String, Object> messageHeaders, boolean strict) {
|
||||
String targetProtocol = (String) messageHeaders.get(MessageUtils.TARGET_PROTOCOL);
|
||||
String prefix = determinePrefixToUse(targetProtocol);
|
||||
if (StringUtils.hasText(prefix)) {
|
||||
if (StringUtils.hasText(prefix) && (strict || StringUtils.hasText((String) messageHeaders.get(prefix + _SPECVERSION)))) {
|
||||
return prefix;
|
||||
}
|
||||
else {
|
||||
@@ -294,7 +298,7 @@ public final class CloudEventMessageUtils {
|
||||
}
|
||||
}
|
||||
|
||||
return "";
|
||||
return DEFAULT_ATTR_PREFIX;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -90,7 +90,7 @@ public class CloudEventsFunctionInvocationHelper implements FunctionInvocationHe
|
||||
if (this.messageConverter != null && CLOUD_EVENT_CLASS != null && CLOUD_EVENT_CLASS.isAssignableFrom(result.getClass())) {
|
||||
convertedResult = this.messageConverter.toMessage(result, input.getHeaders());
|
||||
}
|
||||
String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders());
|
||||
String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders(), true);
|
||||
return this.doPostProcessResult(convertedResult, targetPrefix);
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,26 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
*/
|
||||
public class CloudEventMessageUtilsAndBuilderTests {
|
||||
|
||||
@Test// see https://github.com/spring-cloud/spring-cloud-function/issues/680
|
||||
public void testProperAttributeExtractionRegardlessOfTargetProtocol() {
|
||||
Message<String> ceMessage = CloudEventMessageBuilder.withData("foo").build();
|
||||
ceMessage = MessageBuilder.fromMessage(ceMessage).setHeader("target-protocol", "kafka").build();
|
||||
|
||||
String prefix = CloudEventMessageUtils.determinePrefixToUse(ceMessage.getHeaders());
|
||||
assertThat(prefix).isEqualTo("ce-");
|
||||
prefix = CloudEventMessageUtils.determinePrefixToUse(ceMessage.getHeaders(), true);
|
||||
assertThat(prefix).isEqualTo("ce_");
|
||||
|
||||
String specVersion = CloudEventMessageUtils.getSpecVersion(ceMessage);
|
||||
assertThat(specVersion).isEqualTo("1.0");
|
||||
String type = CloudEventMessageUtils.getType(ceMessage);
|
||||
assertThat(type).isEqualTo("java.lang.String");
|
||||
String id = CloudEventMessageUtils.getId(ceMessage);
|
||||
assertThat(id).isNotNull();
|
||||
URI source = CloudEventMessageUtils.getSource(ceMessage);
|
||||
assertThat(source.toString()).isEqualTo("https://spring.io/");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAttributeRecognitionAndCanonicalization() {
|
||||
Message<String> httpMessage = MessageBuilder.withPayload("hello")
|
||||
|
||||
Reference in New Issue
Block a user