From aede56dfc6930ee155a2f0180074122ae41e06d8 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 2 Dec 2020 18:22:17 +0100 Subject: [PATCH] Fix Cloud Events support for Message functions Ensured Cloud Event completness by adding assertion for required attributes as well as generation of default values for attributes such as ID and SPECVERSION --- .../cloudevent/CloudEventMessageBuilder.java | 65 ++++++++----------- .../cloudevent/CloudEventMessageUtils.java | 12 +++- ...dEventsFunctionExtensionConfiguration.java | 3 + .../CloudEventsFunctionInvocationHelper.java | 27 +++++--- .../catalog/SimpleFunctionRegistry.java | 3 + .../context/message/MessageUtils.java | 12 +++- .../cloudevent/CloudEventFunctionTests.java | 57 ++++++++++++++-- ...CloudEventMessageUtilsAndBuilderTests.java | 4 +- 8 files changed, 124 insertions(+), 59 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index c99a83ac7..a7a245389 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -26,15 +26,18 @@ import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** * Message builder which is aware of Cloud Event semantics. * It provides type-safe setters for v1.0 Cloud Event attributes while - * supporting any version by exposing a convenient {@link #setHeader(String, Object)} method. + * supporting any version by exposing a convenient + * {@link #setHeader(String, Object)} method. * * @author Oleg Zhurakousky * @since 3.1 @@ -139,13 +142,9 @@ public final class CloudEventMessageBuilder { } public Message build() { - if (!this.headers.containsKey(CloudEventMessageUtils.SPECVERSION)) { - this.headers.put(CloudEventMessageUtils.SPECVERSION, "1.0"); - } - return this.doBuild(); + return this.doBuild(CloudEventMessageUtils.determinePrefixToUse(this.headers)); } - public Message build(String attributePrefixToUse) { if (StringUtils.hasText(attributePrefixToUse)) { String[] keys = this.headers.keySet().toArray(new String[] {}); @@ -153,62 +152,50 @@ public final class CloudEventMessageBuilder { if (key.startsWith(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX)) { Object value = headers.remove(key); key = key.substring(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX.length()); - headers.put(attributePrefixToUse + key, value); + this.headers.put(attributePrefixToUse + key, value); } else if (key.startsWith(CloudEventMessageUtils.AMQP_ATTR_PREFIX)) { Object value = headers.remove(key); key = key.substring(CloudEventMessageUtils.AMQP_ATTR_PREFIX.length()); - headers.put(attributePrefixToUse + key, value); + this.headers.put(attributePrefixToUse + key, value); } else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) { Object value = headers.remove(key); key = key.substring(CloudEventMessageUtils.KAFKA_ATTR_PREFIX.length()); - headers.put(attributePrefixToUse + key, value); + this.headers.put(attributePrefixToUse + key, value); } } } - if (!this.headers.containsKey(attributePrefixToUse + "specversion")) { - String prefix = StringUtils.hasText(attributePrefixToUse) ? attributePrefixToUse : CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; + String prefix = StringUtils.hasText(attributePrefixToUse) + ? attributePrefixToUse + : CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; + return doBuild(prefix); + } + + private Message doBuild(String prefix) { + if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SPECVERSION)) { this.headers.put(prefix + CloudEventMessageUtils._SPECVERSION, "1.0"); } - return doBuild(); - } - - private Message doBuild() { - this.headers.put("message-type", "cloudevent"); - CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, this.getUUID(), null); - GenericMessage message = new GenericMessage(data, headers); - return message; - } - - private UUID getUUID() { - UUID id = null; - if (this.headers.containsKey(CloudEventMessageUtils.ID)) { - String stringId = this.headers.get(CloudEventMessageUtils.ID).toString(); - try { - id = UUID.fromString(stringId); - } - catch (Exception e) { - logger.info("Provided Cloud Event 'id' is not compatible with Message 'id' which is UUID, " - + "therefore Cloud Event 'id' will be written as '_id' message header"); - this.headers.put("_" + CloudEventMessageUtils.ID, stringId); - this.headers.remove(CloudEventMessageUtils.ID); - } + if (!this.headers.containsKey(prefix + CloudEventMessageUtils._ID)) { + this.headers.put(prefix + CloudEventMessageUtils._ID, UUID.randomUUID().toString()); } - return id; + this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); + CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, null, null); + GenericMessage message = new GenericMessage(this.data, headers); + Assert.hasText(CloudEventMessageUtils.getSpecVersion(message), "'specversion' must not be null or empty"); + Assert.notNull(CloudEventMessageUtils.getSource(message), "'source' must not be null"); + Assert.hasText(CloudEventMessageUtils.getType(message), "'type' must not be null or empty"); + Assert.hasText(CloudEventMessageUtils.getId(message), "'id' must not be null or empty"); + return message; } private static class CloudEventMessageHeaders extends MessageHeaders { - /** - * - */ private static final long serialVersionUID = -6424866731588545945L; protected CloudEventMessageHeaders(Map headers, UUID id, Long timestamp) { super(headers, id, timestamp); } - } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 2cf107cf9..805f7fefb 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.ContentTypeResolver; @@ -79,6 +80,11 @@ public final class CloudEventMessageUtils { static String _TIME = "time"; // ================================ + /** + * String value of 'cloudevent'. Typically used as {@link MessageUtils#MESSAGE_TYPE} + */ + public static String CLOUDEVENT_VALUE = "cloudevent"; + /** * String value of 'application/cloudevents' mime type. */ @@ -156,9 +162,9 @@ public final class CloudEventMessageUtils { public static String getId(Message message) { - if (message.getHeaders().containsKey("_id")) { - return (String) message.getHeaders().get("_id"); - } +// if (message.getHeaders().containsKey("_id")) { +// return (String) message.getHeaders().get("_id"); +// } String prefix = determinePrefixToUse(message.getHeaders()); return (String) message.getHeaders().get(prefix + MessageHeaders.ID); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java index e7339f5c7..a0f1861d2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java @@ -34,6 +34,8 @@ import org.springframework.messaging.Message; @Configuration(proxyBeanMethods = false) class CloudEventsFunctionExtensionConfiguration { + // The following two beans are intended to be mutually exclusive. Only one should be activated based + // on the presence of Cloud Event SDK API @Bean @ConditionalOnMissingClass("io.cloudevents.CloudEvent") @ConditionalOnMissingBean @@ -48,4 +50,5 @@ class CloudEventsFunctionExtensionConfiguration { // TODO you may need SDKs header provider return null; } + // ======================================================== } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java index ce12537e2..c506e11bb 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.UUID; import org.springframework.beans.BeansException; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -36,7 +37,7 @@ import org.springframework.util.StringUtils; * This is a primary (and the only) integration bridge with {@link FunctionInvocationHelper}. * * @author Oleg Zhurakousky - * @since 2.0 + * @since 3.1 * */ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper>, ApplicationContextAware { @@ -51,7 +52,8 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper message) { - if (message.getHeaders().containsKey("message-type") && message.getHeaders().get("message-type").equals("cloudevent")) { + if (message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE) + && message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE)) { return true; } return false; @@ -64,13 +66,19 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper postProcessResult(Message input, Object result) { - Message resultMessage = null; + Message resultMessage = result instanceof Message ? (Message) result : null; if (CloudEventMessageUtils.isCloudEvent(input)) { - CloudEventMessageBuilder messageBuilder = CloudEventMessageBuilder - .withData(result) - .setId(UUID.randomUUID().toString()) - .setSource(URI.create("http://spring.io/" + getApplicationName())) - .setType(result.getClass().getName()); + CloudEventMessageBuilder messageBuilder; + if (result instanceof Message) { + messageBuilder = CloudEventMessageBuilder.fromMessage((Message) result); + } + else { + messageBuilder = CloudEventMessageBuilder + .withData(result) + .setId(UUID.randomUUID().toString()) + .setSource(URI.create("http://spring.io/" + getApplicationName())) + .setType(result.getClass().getName()); + } if (this.cloudEventAttributesProvider != null) { messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder); @@ -80,9 +88,10 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper)) { resultMessage = MessageBuilder.withPayload(result).build(); } + return resultMessage; } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 7afc840e9..4de0bbf58 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -621,6 +621,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect Map headersMap = (Map) ReflectionUtils .getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders()); this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v)); + if (functionInvocationHelper != null) { + result = functionInvocationHelper.postProcessResult((Message) input, result); + } } else { if (functionInvocationHelper != null) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java index 8c1cb3c72..b764611c6 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java @@ -30,10 +30,20 @@ import org.springframework.util.ReflectionUtils; /** * @author Dave Syer - * + * @author Oleg Zhurakousky */ public abstract class MessageUtils { + /** + * Value for 'message-type' typically use as header key. + */ + public static String MESSAGE_TYPE = "message-type"; + + /** + * Value for 'target-protocol' typically use as header key. + */ + public static String TARGET_PROTOCOL = "target-protocol"; + /** * Create a message for the handler. If the handler is a wrapper for a function in an * isolated class loader, then the message will be created with the target class diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java index aa2e80879..0c768efbc 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java @@ -56,7 +56,6 @@ public class CloudEventFunctionTests { .setType("org.springframework") .build(); - assertThat(inputMessage.getHeaders().getId()).isEqualTo(UUID.fromString(id)); assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue(); Message resultMessage = (Message) function.apply(inputMessage); @@ -87,8 +86,6 @@ public class CloudEventFunctionTests { .setHeader("ce_type", "org.springframework") .build(); -// assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue(); - Message resultMessage = (Message) function.apply(inputMessage); /* @@ -139,6 +136,44 @@ public class CloudEventFunctionTests { assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } + @SuppressWarnings("unchecked") + @Test + public void testStructuredPojoToPojoMessageFunction() throws Exception { + String payload = "{\n" + + " \"specversion\" : \"1.0\",\n" + + " \"type\" : \"org.springframework\",\n" + + " \"source\" : \"https://spring.io/\",\n" + + " \"id\" : \"A234-1234-1234\",\n" + + " \"datacontenttype\" : \"application/json\",\n" + + " \"data\" : {\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }\n" + + "}"; + Function function = this.lookup("springReleaseAsMessage", TestConfiguration.class); + + Message inputMessage = MessageBuilder + .withPayload(payload) + .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") + .build(); + + assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse(); + + Message resultMessage = (Message) function.apply(inputMessage); + assertThat(resultMessage.getPayload().getReleaseDate()) + .isEqualTo(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006")); + assertThat(resultMessage.getPayload().getVersion()).isEqualTo("2.0"); +// /* +// * Validates that although user only deals with POJO, the framework recognizes +// * both on input and output that it is dealing with Cloud Event and generates +// * appropriate headers/attributes +// */ + assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName()); + assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("https://spring.release.event")); + } + @SuppressWarnings("unchecked") @Test public void testStructuredPojoToPojoDefaultOutputAttributeProviderNoDataContentType() throws Exception { @@ -155,8 +190,8 @@ public class CloudEventFunctionTests { "}"; Function function = this.lookup("springRelease", TestConfiguration.class); - Message inputMessage = CloudEventMessageBuilder - .withData(payload) + Message inputMessage = MessageBuilder + .withPayload(payload) .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") .build(); assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse(); @@ -202,6 +237,18 @@ public class CloudEventFunctionTests { } }; } + + @Bean + Function, Message> springReleaseAsMessage() { + return message -> { + SpringReleaseEvent updated = springRelease().apply(message.getPayload()); + return CloudEventMessageBuilder.withData(updated) + .copyHeaders(message.getHeaders()) + .setSource("https://spring.release.event") + .setType(SpringReleaseEvent.class.getName()) + .build(); + }; + } } public static class Person { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java index f76b65f77..65d298f97 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java @@ -46,7 +46,7 @@ public class CloudEventMessageUtilsAndBuilderTests { Message kafkaMessage = CloudEventMessageBuilder.fromMessage(httpMessage).build(CloudEventMessageUtils.KAFKA_ATTR_PREFIX); attributes = CloudEventMessageUtils.getAttributes(kafkaMessage); - assertThat(attributes.size()).isEqualTo(3); + assertThat(attributes.size()).isEqualTo(4); // id will be auto injected, so always at least 4 (as tehre are 4 required attributes in CE) assertThat(kafkaMessage.getHeaders().get("ce_source")).isNotNull(); assertThat(CloudEventMessageUtils.getSource(kafkaMessage)).isEqualTo(URI.create("https://foo.bar")); assertThat(kafkaMessage.getHeaders().get("ce_type")).isNotNull(); @@ -56,7 +56,7 @@ public class CloudEventMessageUtilsAndBuilderTests { httpMessage = CloudEventMessageBuilder.fromMessage(kafkaMessage).build(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX); attributes = CloudEventMessageUtils.getAttributes(httpMessage); - assertThat(attributes.size()).isEqualTo(3); + assertThat(attributes.size()).isEqualTo(4); // assertThat(httpMessage.getHeaders().get("ce-source")).isNotNull(); assertThat(CloudEventMessageUtils.getSource(httpMessage)).isEqualTo(URI.create("https://foo.bar")); assertThat(httpMessage.getHeaders().get("ce-type")).isNotNull();