Fix Cloud Events support for Message functions

Ensured Cloud Event completness by adding assertion for required attributes as well as generation of default values for attributes such as ID and SPECVERSION
This commit is contained in:
Oleg Zhurakousky
2020-12-02 18:22:17 +01:00
parent 923d5204e4
commit aede56dfc6
8 changed files with 124 additions and 59 deletions

View File

@@ -26,15 +26,18 @@ import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* Message builder which is aware of Cloud Event semantics.
* It provides type-safe setters for v1.0 Cloud Event attributes while
* supporting any version by exposing a convenient {@link #setHeader(String, Object)} method.
* supporting any version by exposing a convenient
* {@link #setHeader(String, Object)} method.
*
* @author Oleg Zhurakousky
* @since 3.1
@@ -139,13 +142,9 @@ public final class CloudEventMessageBuilder<T> {
}
public Message<T> build() {
if (!this.headers.containsKey(CloudEventMessageUtils.SPECVERSION)) {
this.headers.put(CloudEventMessageUtils.SPECVERSION, "1.0");
}
return this.doBuild();
return this.doBuild(CloudEventMessageUtils.determinePrefixToUse(this.headers));
}
public Message<T> build(String attributePrefixToUse) {
if (StringUtils.hasText(attributePrefixToUse)) {
String[] keys = this.headers.keySet().toArray(new String[] {});
@@ -153,62 +152,50 @@ public final class CloudEventMessageBuilder<T> {
if (key.startsWith(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX.length());
headers.put(attributePrefixToUse + key, value);
this.headers.put(attributePrefixToUse + key, value);
}
else if (key.startsWith(CloudEventMessageUtils.AMQP_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(CloudEventMessageUtils.AMQP_ATTR_PREFIX.length());
headers.put(attributePrefixToUse + key, value);
this.headers.put(attributePrefixToUse + key, value);
}
else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(CloudEventMessageUtils.KAFKA_ATTR_PREFIX.length());
headers.put(attributePrefixToUse + key, value);
this.headers.put(attributePrefixToUse + key, value);
}
}
}
if (!this.headers.containsKey(attributePrefixToUse + "specversion")) {
String prefix = StringUtils.hasText(attributePrefixToUse) ? attributePrefixToUse : CloudEventMessageUtils.DEFAULT_ATTR_PREFIX;
String prefix = StringUtils.hasText(attributePrefixToUse)
? attributePrefixToUse
: CloudEventMessageUtils.DEFAULT_ATTR_PREFIX;
return doBuild(prefix);
}
private Message<T> doBuild(String prefix) {
if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SPECVERSION)) {
this.headers.put(prefix + CloudEventMessageUtils._SPECVERSION, "1.0");
}
return doBuild();
}
private Message<T> doBuild() {
this.headers.put("message-type", "cloudevent");
CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, this.getUUID(), null);
GenericMessage<T> message = new GenericMessage<T>(data, headers);
return message;
}
private UUID getUUID() {
UUID id = null;
if (this.headers.containsKey(CloudEventMessageUtils.ID)) {
String stringId = this.headers.get(CloudEventMessageUtils.ID).toString();
try {
id = UUID.fromString(stringId);
}
catch (Exception e) {
logger.info("Provided Cloud Event 'id' is not compatible with Message 'id' which is UUID, "
+ "therefore Cloud Event 'id' will be written as '_id' message header");
this.headers.put("_" + CloudEventMessageUtils.ID, stringId);
this.headers.remove(CloudEventMessageUtils.ID);
}
if (!this.headers.containsKey(prefix + CloudEventMessageUtils._ID)) {
this.headers.put(prefix + CloudEventMessageUtils._ID, UUID.randomUUID().toString());
}
return id;
this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, null, null);
GenericMessage<T> message = new GenericMessage<T>(this.data, headers);
Assert.hasText(CloudEventMessageUtils.getSpecVersion(message), "'specversion' must not be null or empty");
Assert.notNull(CloudEventMessageUtils.getSource(message), "'source' must not be null");
Assert.hasText(CloudEventMessageUtils.getType(message), "'type' must not be null or empty");
Assert.hasText(CloudEventMessageUtils.getId(message), "'id' must not be null or empty");
return message;
}
private static class CloudEventMessageHeaders extends MessageHeaders {
/**
*
*/
private static final long serialVersionUID = -6424866731588545945L;
protected CloudEventMessageHeaders(Map<String, Object> headers, UUID id, Long timestamp) {
super(headers, id, timestamp);
}
}
}

View File

@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.ContentTypeResolver;
@@ -79,6 +80,11 @@ public final class CloudEventMessageUtils {
static String _TIME = "time";
// ================================
/**
* String value of 'cloudevent'. Typically used as {@link MessageUtils#MESSAGE_TYPE}
*/
public static String CLOUDEVENT_VALUE = "cloudevent";
/**
* String value of 'application/cloudevents' mime type.
*/
@@ -156,9 +162,9 @@ public final class CloudEventMessageUtils {
public static String getId(Message<?> message) {
if (message.getHeaders().containsKey("_id")) {
return (String) message.getHeaders().get("_id");
}
// if (message.getHeaders().containsKey("_id")) {
// return (String) message.getHeaders().get("_id");
// }
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + MessageHeaders.ID);
}

View File

@@ -34,6 +34,8 @@ import org.springframework.messaging.Message;
@Configuration(proxyBeanMethods = false)
class CloudEventsFunctionExtensionConfiguration {
// The following two beans are intended to be mutually exclusive. Only one should be activated based
// on the presence of Cloud Event SDK API
@Bean
@ConditionalOnMissingClass("io.cloudevents.CloudEvent")
@ConditionalOnMissingBean
@@ -48,4 +50,5 @@ class CloudEventsFunctionExtensionConfiguration {
// TODO you may need SDKs header provider
return null;
}
// ========================================================
}

View File

@@ -20,6 +20,7 @@ import java.net.URI;
import java.util.UUID;
import org.springframework.beans.BeansException;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@@ -36,7 +37,7 @@ import org.springframework.util.StringUtils;
* This is a primary (and the only) integration bridge with {@link FunctionInvocationHelper}.
*
* @author Oleg Zhurakousky
* @since 2.0
* @since 3.1
*
*/
class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Message<?>>, ApplicationContextAware {
@@ -51,7 +52,8 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
@Override
public boolean isRetainOuputAsMessage(Message<?> message) {
if (message.getHeaders().containsKey("message-type") && message.getHeaders().get("message-type").equals("cloudevent")) {
if (message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE)
&& message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE)) {
return true;
}
return false;
@@ -64,13 +66,19 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
@Override
public Message<?> postProcessResult(Message<?> input, Object result) {
Message<?> resultMessage = null;
Message<?> resultMessage = result instanceof Message ? (Message<?>) result : null;
if (CloudEventMessageUtils.isCloudEvent(input)) {
CloudEventMessageBuilder<?> messageBuilder = CloudEventMessageBuilder
.withData(result)
.setId(UUID.randomUUID().toString())
.setSource(URI.create("http://spring.io/" + getApplicationName()))
.setType(result.getClass().getName());
CloudEventMessageBuilder<?> messageBuilder;
if (result instanceof Message) {
messageBuilder = CloudEventMessageBuilder.fromMessage((Message<?>) result);
}
else {
messageBuilder = CloudEventMessageBuilder
.withData(result)
.setId(UUID.randomUUID().toString())
.setSource(URI.create("http://spring.io/" + getApplicationName()))
.setType(result.getClass().getName());
}
if (this.cloudEventAttributesProvider != null) {
messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder);
@@ -80,9 +88,10 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
resultMessage = messageBuilder.build(prefix);
}
else {
else if (!(result instanceof Message<?>)) {
resultMessage = MessageBuilder.withPayload(result).build();
}
return resultMessage;
}

View File

@@ -621,6 +621,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
.getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders());
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
if (functionInvocationHelper != null) {
result = functionInvocationHelper.postProcessResult((Message<?>) input, result);
}
}
else {
if (functionInvocationHelper != null) {

View File

@@ -30,10 +30,20 @@ import org.springframework.util.ReflectionUtils;
/**
* @author Dave Syer
*
* @author Oleg Zhurakousky
*/
public abstract class MessageUtils {
/**
* Value for 'message-type' typically use as header key.
*/
public static String MESSAGE_TYPE = "message-type";
/**
* Value for 'target-protocol' typically use as header key.
*/
public static String TARGET_PROTOCOL = "target-protocol";
/**
* Create a message for the handler. If the handler is a wrapper for a function in an
* isolated class loader, then the message will be created with the target class

View File

@@ -56,7 +56,6 @@ public class CloudEventFunctionTests {
.setType("org.springframework")
.build();
assertThat(inputMessage.getHeaders().getId()).isEqualTo(UUID.fromString(id));
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue();
Message<Person> resultMessage = (Message<Person>) function.apply(inputMessage);
@@ -87,8 +86,6 @@ public class CloudEventFunctionTests {
.setHeader("ce_type", "org.springframework")
.build();
// assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue();
Message<Person> resultMessage = (Message<Person>) function.apply(inputMessage);
/*
@@ -139,6 +136,44 @@ public class CloudEventFunctionTests {
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application"));
}
@SuppressWarnings("unchecked")
@Test
public void testStructuredPojoToPojoMessageFunction() throws Exception {
String payload = "{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"org.springframework\",\n" +
" \"source\" : \"https://spring.io/\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"datacontenttype\" : \"application/json\",\n" +
" \"data\" : {\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }\n" +
"}";
Function<Object, Object> function = this.lookup("springReleaseAsMessage", TestConfiguration.class);
Message<String> inputMessage = MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.build();
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse();
Message<SpringReleaseEvent> resultMessage = (Message<SpringReleaseEvent>) function.apply(inputMessage);
assertThat(resultMessage.getPayload().getReleaseDate())
.isEqualTo(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006"));
assertThat(resultMessage.getPayload().getVersion()).isEqualTo("2.0");
// /*
// * Validates that although user only deals with POJO, the framework recognizes
// * both on input and output that it is dealing with Cloud Event and generates
// * appropriate headers/attributes
// */
assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue();
assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName());
assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("https://spring.release.event"));
}
@SuppressWarnings("unchecked")
@Test
public void testStructuredPojoToPojoDefaultOutputAttributeProviderNoDataContentType() throws Exception {
@@ -155,8 +190,8 @@ public class CloudEventFunctionTests {
"}";
Function<Object, Object> function = this.lookup("springRelease", TestConfiguration.class);
Message<String> inputMessage = CloudEventMessageBuilder
.withData(payload)
Message<String> inputMessage = MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.build();
assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse();
@@ -202,6 +237,18 @@ public class CloudEventFunctionTests {
}
};
}
@Bean
Function<Message<SpringReleaseEvent>, Message<SpringReleaseEvent>> springReleaseAsMessage() {
return message -> {
SpringReleaseEvent updated = springRelease().apply(message.getPayload());
return CloudEventMessageBuilder.withData(updated)
.copyHeaders(message.getHeaders())
.setSource("https://spring.release.event")
.setType(SpringReleaseEvent.class.getName())
.build();
};
}
}
public static class Person {

View File

@@ -46,7 +46,7 @@ public class CloudEventMessageUtilsAndBuilderTests {
Message<String> kafkaMessage = CloudEventMessageBuilder.fromMessage(httpMessage).build(CloudEventMessageUtils.KAFKA_ATTR_PREFIX);
attributes = CloudEventMessageUtils.getAttributes(kafkaMessage);
assertThat(attributes.size()).isEqualTo(3);
assertThat(attributes.size()).isEqualTo(4); // id will be auto injected, so always at least 4 (as tehre are 4 required attributes in CE)
assertThat(kafkaMessage.getHeaders().get("ce_source")).isNotNull();
assertThat(CloudEventMessageUtils.getSource(kafkaMessage)).isEqualTo(URI.create("https://foo.bar"));
assertThat(kafkaMessage.getHeaders().get("ce_type")).isNotNull();
@@ -56,7 +56,7 @@ public class CloudEventMessageUtilsAndBuilderTests {
httpMessage = CloudEventMessageBuilder.fromMessage(kafkaMessage).build(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX);
attributes = CloudEventMessageUtils.getAttributes(httpMessage);
assertThat(attributes.size()).isEqualTo(3);
assertThat(attributes.size()).isEqualTo(4); //
assertThat(httpMessage.getHeaders().get("ce-source")).isNotNull();
assertThat(CloudEventMessageUtils.getSource(httpMessage)).isEqualTo(URI.create("https://foo.bar"));
assertThat(httpMessage.getHeaders().get("ce-type")).isNotNull();