GH-892 Remove use of reflection from CloudEventMessageUtils
Fixes #892 Resolves #894
This commit is contained in:
committed by
Oleg Zhurakousky
parent
5272714d89
commit
fc0dacb893
@@ -16,11 +16,12 @@
|
||||
|
||||
package org.springframework.cloud.function.cloudevent;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.URI;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
@@ -34,7 +35,6 @@ import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
@@ -45,6 +45,7 @@ import org.springframework.util.StringUtils;
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Dave Syer
|
||||
* @author Chris Bono
|
||||
* @since 3.1
|
||||
*/
|
||||
public final class CloudEventMessageUtils {
|
||||
@@ -61,12 +62,6 @@ public final class CloudEventMessageUtils {
|
||||
|
||||
};
|
||||
|
||||
private static Field MESSAGE_HEADERS = ReflectionUtils.findField(MessageHeaders.class, "headers");
|
||||
|
||||
static {
|
||||
MESSAGE_HEADERS.setAccessible(true);
|
||||
}
|
||||
|
||||
private CloudEventMessageUtils() {
|
||||
}
|
||||
|
||||
@@ -226,15 +221,16 @@ public final class CloudEventMessageUtils {
|
||||
|
||||
/**
|
||||
* This method does several things.
|
||||
* First in canonicalizes Cloud Events attributes ensuring that they all prefixed
|
||||
* First it canonicalizes Cloud Events attributes ensuring that they are all prefixed
|
||||
* with 'ce-' prefix regardless where they came from.
|
||||
* It also transforms structured-mode Cloud Event to binary-mode and then it canonicalizes attributes
|
||||
* It also transforms structured-mode Cloud Event to binary-mode and then canonicalizes attributes
|
||||
* as well as described in the previous sentence.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
static Message<?> toCanonical(Message<?> inputMessage, MessageConverter messageConverter) {
|
||||
Map<String, Object> headers = (Map<String, Object>) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders());
|
||||
canonicalizeHeaders(headers, false);
|
||||
inputMessage = canonicalizeHeadersWithPossibleCopy(inputMessage);
|
||||
Map<String, Object> headers = new HashMap<>(inputMessage.getHeaders());
|
||||
|
||||
if (isCloudEvent(inputMessage) && headers.containsKey("content-type")) {
|
||||
inputMessage = MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, headers.get("content-type")).build();
|
||||
}
|
||||
@@ -272,6 +268,67 @@ public final class CloudEventMessageUtils {
|
||||
return inputMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to {@link #canonicalizeHeaders canonicalize} the headers of a message.
|
||||
* @param message the message
|
||||
* @return a copy of the message with the canonicalized headers or the passed in unmodified message if no
|
||||
* headers were canonicalized
|
||||
*/
|
||||
// VisibleForTesting
|
||||
static Message<?> canonicalizeHeadersWithPossibleCopy(Message<?> message) {
|
||||
Map<String, Object> headers = new HashMap<>(message.getHeaders());
|
||||
boolean headersModified = canonicalizeHeaders(headers, false);
|
||||
if (headersModified) {
|
||||
message = MessageBuilder.fromMessage(message)
|
||||
.removeHeaders("*")
|
||||
.copyHeaders(headers)
|
||||
.build();
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
* Will canonicalize Cloud Event attributes (headers) by ensuring canonical
|
||||
* prefix for all attributes and extensions regardless of where they came from.
|
||||
* The canonical prefix is 'ce-'.
|
||||
*
|
||||
* So, for example 'ce_source' will become 'ce-source'.
|
||||
* @param headers message headers
|
||||
* @param structured boolean signifying that headers map represents structured Cloud Event
|
||||
* at which point attributes without any prefix will still be treated as
|
||||
* Cloud Event attributes.
|
||||
* @return whether the headers were modified during the process
|
||||
*/
|
||||
private static boolean canonicalizeHeaders(Map<String, Object> headers, boolean structured) {
|
||||
boolean modified = false;
|
||||
String[] keys = headers.keySet().toArray(new String[] {});
|
||||
for (String key : keys) {
|
||||
if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
String newKey = DEFAULT_ATTR_PREFIX + key.substring(DEFAULT_ATTR_PREFIX.length());
|
||||
headers.put(newKey, value);
|
||||
modified |= (!Objects.equals(key, newKey));
|
||||
}
|
||||
else if (key.startsWith(KAFKA_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(KAFKA_ATTR_PREFIX.length());
|
||||
headers.put(DEFAULT_ATTR_PREFIX + key, value);
|
||||
modified = true;
|
||||
}
|
||||
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(AMQP_ATTR_PREFIX.length());
|
||||
headers.put(DEFAULT_ATTR_PREFIX + key, value);
|
||||
modified = true;
|
||||
}
|
||||
else if (structured) {
|
||||
Object value = headers.remove(key);
|
||||
headers.put(DEFAULT_ATTR_PREFIX + key, value);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
return modified;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines attribute prefix based on the presence of certain well defined headers.
|
||||
@@ -354,42 +411,6 @@ public final class CloudEventMessageUtils {
|
||||
return key.startsWith(DEFAULT_ATTR_PREFIX) || key.startsWith(AMQP_ATTR_PREFIX) || key.startsWith(KAFKA_ATTR_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Will canonicalize Cloud Event attributes (headers) by ensuring canonical
|
||||
* prefix for all attributes and extensions regardless of where they came from.
|
||||
* The canonical prefix is 'ce-'.
|
||||
*
|
||||
* So, for example 'ce_source' will become 'ce-source'.
|
||||
* @param headers message headers
|
||||
* @param structured boolean signifying that headers map represents structured Cloud Event
|
||||
* at which point attributes without any prefix will still be treated as
|
||||
* Cloud Event attributes.
|
||||
*/
|
||||
private static void canonicalizeHeaders(Map<String, Object> headers, boolean structured) {
|
||||
String[] keys = headers.keySet().toArray(new String[] {});
|
||||
for (String key : keys) {
|
||||
if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(DEFAULT_ATTR_PREFIX.length());
|
||||
headers.put(DEFAULT_ATTR_PREFIX + key, value);
|
||||
}
|
||||
else if (key.startsWith(KAFKA_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(KAFKA_ATTR_PREFIX.length());
|
||||
headers.put(DEFAULT_ATTR_PREFIX + key, value);
|
||||
}
|
||||
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
|
||||
Object value = headers.remove(key);
|
||||
key = key.substring(AMQP_ATTR_PREFIX.length());
|
||||
headers.put(DEFAULT_ATTR_PREFIX + key, value);
|
||||
}
|
||||
else if (structured) {
|
||||
Object value = headers.remove(key);
|
||||
headers.put(DEFAULT_ATTR_PREFIX + key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static Message<?> buildBinaryMessageFromStructuredMap(Map<String, Object> structuredCloudEvent,
|
||||
MessageHeaders originalHeaders) {
|
||||
Object payload = structuredCloudEvent.remove(DATA);
|
||||
|
||||
@@ -97,4 +97,45 @@ public class CloudEventMessageUtilsAndBuilderTests {
|
||||
assertThat(httpMessage.getHeaders().get("ce-specversion")).isNotNull();
|
||||
assertThat(CloudEventMessageUtils.getSpecVersion(httpMessage)).isEqualTo("1.0");
|
||||
}
|
||||
|
||||
@Test
|
||||
void canonicalizeHeadersWithPossibleCopyReturnsCopyWithUpdatedHeadersWhenModified() {
|
||||
// TODO add the following test cases
|
||||
//
|
||||
// defaultAttrs w/ unmodified keys -> not modified
|
||||
// defaultAttrs w/ modified keys -> modified
|
||||
// kafkaAttrs w/ (defaultAttrs+unmodified keys) -> modified
|
||||
// amqpAttrs -> modified
|
||||
// structured -> modified
|
||||
Message<?> inputMessage = MessageBuilder.withPayload("hello")
|
||||
.setHeader("ce_foo", "bar")
|
||||
.setHeader("x", "x1")
|
||||
.setHeader("x|x", "x2")
|
||||
.build();
|
||||
|
||||
Message<?> updatedMessage = CloudEventMessageUtils.canonicalizeHeadersWithPossibleCopy(inputMessage);
|
||||
|
||||
assertThat(inputMessage).isNotSameAs(updatedMessage);
|
||||
assertThat(updatedMessage.getHeaders())
|
||||
.containsEntry("ce-foo", "bar")
|
||||
.containsEntry("x", "x1")
|
||||
.containsEntry("x|x", "x2");
|
||||
}
|
||||
|
||||
@Test
|
||||
void canonicalizeHeadersWithPossibleCopyReturnsSameInstanceWhenNotModified() {
|
||||
Message<?> inputMessage = MessageBuilder.withPayload("hello")
|
||||
.setHeader("ce-foo", "bar")
|
||||
.setHeader("x", "x1")
|
||||
.setHeader("x|x", "x2")
|
||||
.build();
|
||||
|
||||
Message<?> updatedMessage = CloudEventMessageUtils.canonicalizeHeadersWithPossibleCopy(inputMessage);
|
||||
|
||||
assertThat(inputMessage).isSameAs(updatedMessage);
|
||||
assertThat(updatedMessage.getHeaders())
|
||||
.containsEntry("ce-foo", "bar")
|
||||
.containsEntry("x", "x1")
|
||||
.containsEntry("x|x", "x2");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user