GH-804 Add support for case-insensitive Cloud Event determination
Resolves #804
This commit is contained in:
@@ -24,6 +24,7 @@ import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils.MessageStructureWithCaseInsensitiveHeaderKeys;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
@@ -331,21 +332,22 @@ public final class CloudEventMessageUtils {
|
||||
* @return true if this Message represents Cloud Event in binary-mode
|
||||
*/
|
||||
public static boolean isCloudEvent(Message<?> message) {
|
||||
return (message.getHeaders().containsKey(SPECVERSION)
|
||||
&& message.getHeaders().containsKey(TYPE)
|
||||
&& message.getHeaders().containsKey(SOURCE))
|
||||
MessageStructureWithCaseInsensitiveHeaderKeys _message = MessageUtils.toCaseInsensitiveHeadersStructure(message);
|
||||
return (_message.getHeaders().containsKey(SPECVERSION)
|
||||
&& _message.getHeaders().containsKey(TYPE)
|
||||
&& _message.getHeaders().containsKey(SOURCE))
|
||||
||
|
||||
(message.getHeaders().containsKey(_SPECVERSION)
|
||||
&& message.getHeaders().containsKey(_TYPE)
|
||||
&& message.getHeaders().containsKey(_SOURCE))
|
||||
(_message.getHeaders().containsKey(_SPECVERSION)
|
||||
&& _message.getHeaders().containsKey(_TYPE)
|
||||
&& _message.getHeaders().containsKey(_SOURCE))
|
||||
||
|
||||
(message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SPECVERSION)
|
||||
&& message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _TYPE)
|
||||
&& message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SOURCE))
|
||||
(_message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SPECVERSION)
|
||||
&& _message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _TYPE)
|
||||
&& _message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SOURCE))
|
||||
||
|
||||
(message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SPECVERSION)
|
||||
&& message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _TYPE)
|
||||
&& message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SOURCE));
|
||||
(_message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SPECVERSION)
|
||||
&& _message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _TYPE)
|
||||
&& _message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SOURCE));
|
||||
}
|
||||
|
||||
private static boolean isAttribute(String key) {
|
||||
|
||||
@@ -16,8 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.function.context.config;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@@ -31,6 +29,7 @@ import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.MessageRoutingCallback;
|
||||
import org.springframework.cloud.function.context.MessageRoutingCallback.FunctionRoutingResult;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.context.expression.MapAccessor;
|
||||
import org.springframework.expression.BeanResolver;
|
||||
import org.springframework.expression.Expression;
|
||||
@@ -196,7 +195,7 @@ public class RoutingFunction implements Function<Object, Object> {
|
||||
private FunctionInvocationWrapper functionFromExpression(String routingExpression, Object input) {
|
||||
Expression expression = spelParser.parseExpression(routingExpression);
|
||||
if (input instanceof Message) {
|
||||
input = new MessageStructureWithCaseInsensitiveHeaderKeys((Message<?>) input);
|
||||
input = MessageUtils.toCaseInsensitiveHeadersStructure((Message<?>) input);
|
||||
}
|
||||
|
||||
String functionName = expression.getValue(this.evalContext, input, String.class);
|
||||
@@ -209,24 +208,4 @@ public class RoutingFunction implements Function<Object, Object> {
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unused"})
|
||||
private static class MessageStructureWithCaseInsensitiveHeaderKeys {
|
||||
private final Object payload;
|
||||
private final Map headers;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
MessageStructureWithCaseInsensitiveHeaderKeys(Message message) {
|
||||
this.payload = message.getPayload();
|
||||
this.headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
|
||||
this.headers.putAll(message.getHeaders());
|
||||
}
|
||||
public Object getPayload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
public Map getHeaders() {
|
||||
return headers;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.function.context.message;
|
||||
|
||||
<<<<<<< HEAD
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
@@ -27,6 +28,12 @@ import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
=======
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
>>>>>>> 8f15b9ba... GH-804 Add support for case-insensitive Cloud Event determination
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
@@ -50,6 +57,7 @@ public abstract class MessageUtils {
|
||||
public static String SOURCE_TYPE = "source-type";
|
||||
|
||||
/**
|
||||
<<<<<<< HEAD
|
||||
* Create a message for the handler. If the handler is a wrapper for a function in an
|
||||
* isolated class loader, then the message will be created with the target class
|
||||
* loader (therefore the {@link Message} class must be on the classpath of the target
|
||||
@@ -124,4 +132,33 @@ public abstract class MessageUtils {
|
||||
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns (payload, headers) structure identical to `message` while substituting headers with case insensitive map.
|
||||
*/
|
||||
public static MessageStructureWithCaseInsensitiveHeaderKeys toCaseInsensitiveHeadersStructure(Message<?> message) {
|
||||
return new MessageStructureWithCaseInsensitiveHeaderKeys(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* !!! INTERNAL USE ONLY, MAY CHANGE OR REMOVED WITHOUT NOTICE!!!
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes"})
|
||||
public static class MessageStructureWithCaseInsensitiveHeaderKeys {
|
||||
private final Object payload;
|
||||
private final Map headers;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
MessageStructureWithCaseInsensitiveHeaderKeys(Message message) {
|
||||
this.payload = message.getPayload();
|
||||
this.headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
|
||||
this.headers.putAll(message.getHeaders());
|
||||
}
|
||||
public Object getPayload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
public Map getHeaders() {
|
||||
return headers;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,19 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
*/
|
||||
public class CloudEventMessageUtilsAndBuilderTests {
|
||||
|
||||
@Test// see https://github.com/spring-cloud/spring-cloud-function/issues/805
|
||||
public void testHeaderKeyInsensitivity() {
|
||||
Message<String> httpMessage = MessageBuilder.withPayload("hello")
|
||||
.setHeader("cE-SoUrCe", "https://foo.bar")
|
||||
.setHeader("Ce-specVeRsion", "1.0")
|
||||
.setHeader("Ce-Type", "blah")
|
||||
.setHeader("x", "x")
|
||||
.setHeader("zzz", "zzz")
|
||||
.build();
|
||||
|
||||
assertThat(CloudEventMessageUtils.isCloudEvent(httpMessage)).isTrue();
|
||||
}
|
||||
|
||||
@Test// see https://github.com/spring-cloud/spring-cloud-function/issues/680
|
||||
public void testProperAttributeExtractionRegardlessOfTargetProtocol() {
|
||||
Message<String> ceMessage = CloudEventMessageBuilder.withData("foo").build();
|
||||
|
||||
Reference in New Issue
Block a user