GH-804 Add support for case-insensitive Cloud Event determination

Resolves #804
This commit is contained in:
Oleg Zhurakousky
2022-02-09 14:23:21 +01:00
parent 5b4cec0db3
commit 2d59a0d759
4 changed files with 64 additions and 35 deletions

View File

@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.context.message.MessageUtils.MessageStructureWithCaseInsensitiveHeaderKeys;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -331,21 +332,22 @@ public final class CloudEventMessageUtils {
* @return true if this Message represents Cloud Event in binary-mode
*/
public static boolean isCloudEvent(Message<?> message) {
return (message.getHeaders().containsKey(SPECVERSION)
&& message.getHeaders().containsKey(TYPE)
&& message.getHeaders().containsKey(SOURCE))
MessageStructureWithCaseInsensitiveHeaderKeys _message = MessageUtils.toCaseInsensitiveHeadersStructure(message);
return (_message.getHeaders().containsKey(SPECVERSION)
&& _message.getHeaders().containsKey(TYPE)
&& _message.getHeaders().containsKey(SOURCE))
||
(message.getHeaders().containsKey(_SPECVERSION)
&& message.getHeaders().containsKey(_TYPE)
&& message.getHeaders().containsKey(_SOURCE))
(_message.getHeaders().containsKey(_SPECVERSION)
&& _message.getHeaders().containsKey(_TYPE)
&& _message.getHeaders().containsKey(_SOURCE))
||
(message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SPECVERSION)
&& message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _TYPE)
&& message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SOURCE))
(_message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SPECVERSION)
&& _message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _TYPE)
&& _message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SOURCE))
||
(message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SPECVERSION)
&& message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _TYPE)
&& message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SOURCE));
(_message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SPECVERSION)
&& _message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _TYPE)
&& _message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SOURCE));
}
private static boolean isAttribute(String key) {

View File

@@ -16,8 +16,6 @@
package org.springframework.cloud.function.context.config;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import org.apache.commons.logging.Log;
@@ -31,6 +29,7 @@ import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.function.context.MessageRoutingCallback.FunctionRoutingResult;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.Expression;
@@ -196,7 +195,7 @@ public class RoutingFunction implements Function<Object, Object> {
private FunctionInvocationWrapper functionFromExpression(String routingExpression, Object input) {
Expression expression = spelParser.parseExpression(routingExpression);
if (input instanceof Message) {
input = new MessageStructureWithCaseInsensitiveHeaderKeys((Message<?>) input);
input = MessageUtils.toCaseInsensitiveHeadersStructure((Message<?>) input);
}
String functionName = expression.getValue(this.evalContext, input, String.class);
@@ -209,24 +208,4 @@ public class RoutingFunction implements Function<Object, Object> {
}
return function;
}
@SuppressWarnings({"rawtypes", "unused"})
private static class MessageStructureWithCaseInsensitiveHeaderKeys {
private final Object payload;
private final Map headers;
@SuppressWarnings("unchecked")
MessageStructureWithCaseInsensitiveHeaderKeys(Message message) {
this.payload = message.getPayload();
this.headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.headers.putAll(message.getHeaders());
}
public Object getPayload() {
return payload;
}
public Map getHeaders() {
return headers;
}
}
}

View File

@@ -16,6 +16,11 @@
package org.springframework.cloud.function.context.message;
import java.util.Map;
import java.util.TreeMap;
import org.springframework.messaging.Message;
/**
* @author Dave Syer
* @author Oleg Zhurakousky
@@ -36,4 +41,34 @@ public abstract class MessageUtils {
* Value for 'target-protocol' typically use as header key.
*/
public static String SOURCE_TYPE = "source-type";
/**
* Returns (payload, headers) structure identical to `message` while substituting headers with case insensitive map.
*/
public static MessageStructureWithCaseInsensitiveHeaderKeys toCaseInsensitiveHeadersStructure(Message<?> message) {
return new MessageStructureWithCaseInsensitiveHeaderKeys(message);
}
/**
* !!! INTERNAL USE ONLY, MAY CHANGE OR REMOVED WITHOUT NOTICE!!!
*/
@SuppressWarnings({"rawtypes"})
public static class MessageStructureWithCaseInsensitiveHeaderKeys {
private final Object payload;
private final Map headers;
@SuppressWarnings("unchecked")
MessageStructureWithCaseInsensitiveHeaderKeys(Message message) {
this.payload = message.getPayload();
this.headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.headers.putAll(message.getHeaders());
}
public Object getPayload() {
return payload;
}
public Map getHeaders() {
return headers;
}
}
}

View File

@@ -31,6 +31,19 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class CloudEventMessageUtilsAndBuilderTests {
@Test// see https://github.com/spring-cloud/spring-cloud-function/issues/805
public void testHeaderKeyInsensitivity() {
Message<String> httpMessage = MessageBuilder.withPayload("hello")
.setHeader("cE-SoUrCe", "https://foo.bar")
.setHeader("Ce-specVeRsion", "1.0")
.setHeader("Ce-Type", "blah")
.setHeader("x", "x")
.setHeader("zzz", "zzz")
.build();
assertThat(CloudEventMessageUtils.isCloudEvent(httpMessage)).isTrue();
}
@Test// see https://github.com/spring-cloud/spring-cloud-function/issues/680
public void testProperAttributeExtractionRegardlessOfTargetProtocol() {
Message<String> ceMessage = CloudEventMessageBuilder.withData("foo").build();