diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index c99a83ac7..a7a245389 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -26,15 +26,18 @@ import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** * Message builder which is aware of Cloud Event semantics. * It provides type-safe setters for v1.0 Cloud Event attributes while - * supporting any version by exposing a convenient {@link #setHeader(String, Object)} method. + * supporting any version by exposing a convenient + * {@link #setHeader(String, Object)} method. * * @author Oleg Zhurakousky * @since 3.1 @@ -139,13 +142,9 @@ public final class CloudEventMessageBuilder { } public Message build() { - if (!this.headers.containsKey(CloudEventMessageUtils.SPECVERSION)) { - this.headers.put(CloudEventMessageUtils.SPECVERSION, "1.0"); - } - return this.doBuild(); + return this.doBuild(CloudEventMessageUtils.determinePrefixToUse(this.headers)); } - public Message build(String attributePrefixToUse) { if (StringUtils.hasText(attributePrefixToUse)) { String[] keys = this.headers.keySet().toArray(new String[] {}); @@ -153,62 +152,50 @@ public final class CloudEventMessageBuilder { if (key.startsWith(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX)) { Object value = headers.remove(key); key = key.substring(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX.length()); - headers.put(attributePrefixToUse + key, value); + this.headers.put(attributePrefixToUse + key, value); } else if (key.startsWith(CloudEventMessageUtils.AMQP_ATTR_PREFIX)) { Object value = headers.remove(key); key = key.substring(CloudEventMessageUtils.AMQP_ATTR_PREFIX.length()); - headers.put(attributePrefixToUse + key, value); + this.headers.put(attributePrefixToUse + key, value); } else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) { Object value = headers.remove(key); key = key.substring(CloudEventMessageUtils.KAFKA_ATTR_PREFIX.length()); - headers.put(attributePrefixToUse + key, value); + this.headers.put(attributePrefixToUse + key, value); } } } - if (!this.headers.containsKey(attributePrefixToUse + "specversion")) { - String prefix = StringUtils.hasText(attributePrefixToUse) ? attributePrefixToUse : CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; + String prefix = StringUtils.hasText(attributePrefixToUse) + ? attributePrefixToUse + : CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; + return doBuild(prefix); + } + + private Message doBuild(String prefix) { + if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SPECVERSION)) { this.headers.put(prefix + CloudEventMessageUtils._SPECVERSION, "1.0"); } - return doBuild(); - } - - private Message doBuild() { - this.headers.put("message-type", "cloudevent"); - CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, this.getUUID(), null); - GenericMessage message = new GenericMessage(data, headers); - return message; - } - - private UUID getUUID() { - UUID id = null; - if (this.headers.containsKey(CloudEventMessageUtils.ID)) { - String stringId = this.headers.get(CloudEventMessageUtils.ID).toString(); - try { - id = UUID.fromString(stringId); - } - catch (Exception e) { - logger.info("Provided Cloud Event 'id' is not compatible with Message 'id' which is UUID, " - + "therefore Cloud Event 'id' will be written as '_id' message header"); - this.headers.put("_" + CloudEventMessageUtils.ID, stringId); - this.headers.remove(CloudEventMessageUtils.ID); - } + if (!this.headers.containsKey(prefix + CloudEventMessageUtils._ID)) { + this.headers.put(prefix + CloudEventMessageUtils._ID, UUID.randomUUID().toString()); } - return id; + this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); + CloudEventMessageHeaders headers = new CloudEventMessageHeaders(this.headers, null, null); + GenericMessage message = new GenericMessage(this.data, headers); + Assert.hasText(CloudEventMessageUtils.getSpecVersion(message), "'specversion' must not be null or empty"); + Assert.notNull(CloudEventMessageUtils.getSource(message), "'source' must not be null"); + Assert.hasText(CloudEventMessageUtils.getType(message), "'type' must not be null or empty"); + Assert.hasText(CloudEventMessageUtils.getId(message), "'id' must not be null or empty"); + return message; } private static class CloudEventMessageHeaders extends MessageHeaders { - /** - * - */ private static final long serialVersionUID = -6424866731588545945L; protected CloudEventMessageHeaders(Map headers, UUID id, Long timestamp) { super(headers, id, timestamp); } - } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 2cf107cf9..805f7fefb 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.ContentTypeResolver; @@ -79,6 +80,11 @@ public final class CloudEventMessageUtils { static String _TIME = "time"; // ================================ + /** + * String value of 'cloudevent'. Typically used as {@link MessageUtils#MESSAGE_TYPE} + */ + public static String CLOUDEVENT_VALUE = "cloudevent"; + /** * String value of 'application/cloudevents' mime type. */ @@ -156,9 +162,9 @@ public final class CloudEventMessageUtils { public static String getId(Message message) { - if (message.getHeaders().containsKey("_id")) { - return (String) message.getHeaders().get("_id"); - } +// if (message.getHeaders().containsKey("_id")) { +// return (String) message.getHeaders().get("_id"); +// } String prefix = determinePrefixToUse(message.getHeaders()); return (String) message.getHeaders().get(prefix + MessageHeaders.ID); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java index e7339f5c7..a0f1861d2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionExtensionConfiguration.java @@ -34,6 +34,8 @@ import org.springframework.messaging.Message; @Configuration(proxyBeanMethods = false) class CloudEventsFunctionExtensionConfiguration { + // The following two beans are intended to be mutually exclusive. Only one should be activated based + // on the presence of Cloud Event SDK API @Bean @ConditionalOnMissingClass("io.cloudevents.CloudEvent") @ConditionalOnMissingBean @@ -48,4 +50,5 @@ class CloudEventsFunctionExtensionConfiguration { // TODO you may need SDKs header provider return null; } + // ======================================================== } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java index ce12537e2..c506e11bb 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.UUID; import org.springframework.beans.BeansException; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -36,7 +37,7 @@ import org.springframework.util.StringUtils; * This is a primary (and the only) integration bridge with {@link FunctionInvocationHelper}. * * @author Oleg Zhurakousky - * @since 2.0 + * @since 3.1 * */ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper>, ApplicationContextAware { @@ -51,7 +52,8 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper message) { - if (message.getHeaders().containsKey("message-type") && message.getHeaders().get("message-type").equals("cloudevent")) { + if (message.getHeaders().containsKey(MessageUtils.MESSAGE_TYPE) + && message.getHeaders().get(MessageUtils.MESSAGE_TYPE).equals(CloudEventMessageUtils.CLOUDEVENT_VALUE)) { return true; } return false; @@ -64,13 +66,19 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper postProcessResult(Message input, Object result) { - Message resultMessage = null; + Message resultMessage = result instanceof Message ? (Message) result : null; if (CloudEventMessageUtils.isCloudEvent(input)) { - CloudEventMessageBuilder messageBuilder = CloudEventMessageBuilder - .withData(result) - .setId(UUID.randomUUID().toString()) - .setSource(URI.create("http://spring.io/" + getApplicationName())) - .setType(result.getClass().getName()); + CloudEventMessageBuilder messageBuilder; + if (result instanceof Message) { + messageBuilder = CloudEventMessageBuilder.fromMessage((Message) result); + } + else { + messageBuilder = CloudEventMessageBuilder + .withData(result) + .setId(UUID.randomUUID().toString()) + .setSource(URI.create("http://spring.io/" + getApplicationName())) + .setType(result.getClass().getName()); + } if (this.cloudEventAttributesProvider != null) { messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder); @@ -80,9 +88,10 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper)) { resultMessage = MessageBuilder.withPayload(result).build(); } + return resultMessage; } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 7afc840e9..4de0bbf58 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -621,6 +621,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect Map headersMap = (Map) ReflectionUtils .getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders()); this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v)); + if (functionInvocationHelper != null) { + result = functionInvocationHelper.postProcessResult((Message) input, result); + } } else { if (functionInvocationHelper != null) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java index 8c1cb3c72..b764611c6 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java @@ -30,10 +30,20 @@ import org.springframework.util.ReflectionUtils; /** * @author Dave Syer - * + * @author Oleg Zhurakousky */ public abstract class MessageUtils { + /** + * Value for 'message-type' typically use as header key. + */ + public static String MESSAGE_TYPE = "message-type"; + + /** + * Value for 'target-protocol' typically use as header key. + */ + public static String TARGET_PROTOCOL = "target-protocol"; + /** * Create a message for the handler. If the handler is a wrapper for a function in an * isolated class loader, then the message will be created with the target class diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java index aa2e80879..0c768efbc 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java @@ -56,7 +56,6 @@ public class CloudEventFunctionTests { .setType("org.springframework") .build(); - assertThat(inputMessage.getHeaders().getId()).isEqualTo(UUID.fromString(id)); assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue(); Message resultMessage = (Message) function.apply(inputMessage); @@ -87,8 +86,6 @@ public class CloudEventFunctionTests { .setHeader("ce_type", "org.springframework") .build(); -// assertThat(CloudEventMessageUtils.isBinary(inputMessage)).isTrue(); - Message resultMessage = (Message) function.apply(inputMessage); /* @@ -139,6 +136,44 @@ public class CloudEventFunctionTests { assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } + @SuppressWarnings("unchecked") + @Test + public void testStructuredPojoToPojoMessageFunction() throws Exception { + String payload = "{\n" + + " \"specversion\" : \"1.0\",\n" + + " \"type\" : \"org.springframework\",\n" + + " \"source\" : \"https://spring.io/\",\n" + + " \"id\" : \"A234-1234-1234\",\n" + + " \"datacontenttype\" : \"application/json\",\n" + + " \"data\" : {\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }\n" + + "}"; + Function function = this.lookup("springReleaseAsMessage", TestConfiguration.class); + + Message inputMessage = MessageBuilder + .withPayload(payload) + .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") + .build(); + + assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse(); + + Message resultMessage = (Message) function.apply(inputMessage); + assertThat(resultMessage.getPayload().getReleaseDate()) + .isEqualTo(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006")); + assertThat(resultMessage.getPayload().getVersion()).isEqualTo("2.0"); +// /* +// * Validates that although user only deals with POJO, the framework recognizes +// * both on input and output that it is dealing with Cloud Event and generates +// * appropriate headers/attributes +// */ + assertThat(CloudEventMessageUtils.isCloudEvent(resultMessage)).isTrue(); + assertThat(CloudEventMessageUtils.getType(resultMessage)).isEqualTo(SpringReleaseEvent.class.getName()); + assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("https://spring.release.event")); + } + @SuppressWarnings("unchecked") @Test public void testStructuredPojoToPojoDefaultOutputAttributeProviderNoDataContentType() throws Exception { @@ -155,8 +190,8 @@ public class CloudEventFunctionTests { "}"; Function function = this.lookup("springRelease", TestConfiguration.class); - Message inputMessage = CloudEventMessageBuilder - .withData(payload) + Message inputMessage = MessageBuilder + .withPayload(payload) .setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") .build(); assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isFalse(); @@ -202,6 +237,18 @@ public class CloudEventFunctionTests { } }; } + + @Bean + Function, Message> springReleaseAsMessage() { + return message -> { + SpringReleaseEvent updated = springRelease().apply(message.getPayload()); + return CloudEventMessageBuilder.withData(updated) + .copyHeaders(message.getHeaders()) + .setSource("https://spring.release.event") + .setType(SpringReleaseEvent.class.getName()) + .build(); + }; + } } public static class Person { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java index f76b65f77..65d298f97 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java @@ -46,7 +46,7 @@ public class CloudEventMessageUtilsAndBuilderTests { Message kafkaMessage = CloudEventMessageBuilder.fromMessage(httpMessage).build(CloudEventMessageUtils.KAFKA_ATTR_PREFIX); attributes = CloudEventMessageUtils.getAttributes(kafkaMessage); - assertThat(attributes.size()).isEqualTo(3); + assertThat(attributes.size()).isEqualTo(4); // id will be auto injected, so always at least 4 (as tehre are 4 required attributes in CE) assertThat(kafkaMessage.getHeaders().get("ce_source")).isNotNull(); assertThat(CloudEventMessageUtils.getSource(kafkaMessage)).isEqualTo(URI.create("https://foo.bar")); assertThat(kafkaMessage.getHeaders().get("ce_type")).isNotNull(); @@ -56,7 +56,7 @@ public class CloudEventMessageUtilsAndBuilderTests { httpMessage = CloudEventMessageBuilder.fromMessage(kafkaMessage).build(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX); attributes = CloudEventMessageUtils.getAttributes(httpMessage); - assertThat(attributes.size()).isEqualTo(3); + assertThat(attributes.size()).isEqualTo(4); // assertThat(httpMessage.getHeaders().get("ce-source")).isNotNull(); assertThat(CloudEventMessageUtils.getSource(httpMessage)).isEqualTo(URI.create("https://foo.bar")); assertThat(httpMessage.getHeaders().get("ce-type")).isNotNull();