diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index d3d4bc0a9..47ef6caf1 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -199,10 +199,8 @@ public final class CloudEventMessageBuilder { this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); MessageHeaders headers = new MessageHeaders(this.headers); GenericMessage message = new GenericMessage(this.data, headers); - Assert.hasText(CloudEventMessageUtils.getSpecVersion(message), "'specversion' must not be null or empty"); - Assert.notNull(CloudEventMessageUtils.getSource(message), "'source' must not be null"); - Assert.hasText(CloudEventMessageUtils.getType(message), "'type' must not be null or empty"); - Assert.hasText(CloudEventMessageUtils.getId(message), "'id' must not be null or empty"); + Assert.isTrue(CloudEventMessageUtils.isCloudEvent(message), "The message does not appear to be a valid Cloud Event, " + + "since one of the required attributes (id, specversion, type, source) is missing"); return message; } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index ca6b2cc76..2cd2ab008 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -265,20 +265,9 @@ public final class CloudEventMessageUtils { */ static String determinePrefixToUse(Map messageHeaders) { String targetProtocol = (String) messageHeaders.get(MessageUtils.TARGET_PROTOCOL); - if (StringUtils.hasText(targetProtocol)) { - if ("kafka".equals(targetProtocol)) { - return CloudEventMessageUtils.KAFKA_ATTR_PREFIX; - } - else if ("amqp".equals(targetProtocol)) { - return CloudEventMessageUtils.AMQP_ATTR_PREFIX; - } - else if ("http".equals(targetProtocol)) { - return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; - } - else { - throw new IllegalArgumentException("Provided TARGET_PROTOCOL is not suported: " + targetProtocol + ". " - + "Supported protoclos are, 'kafka', 'amqp' and 'http'"); - } + String prefix = determinePrefixToUse(targetProtocol); + if (StringUtils.hasText(prefix)) { + return prefix; } else { for (String key : messageHeaders.keySet()) { @@ -297,16 +286,44 @@ public final class CloudEventMessageUtils { return ""; } + /** + * Determines attribute prefix based on the provided target protocol. + * @param targetProtocol target protocol (see {@link MessageUtils#TARGET_PROTOCOL} + * @return prefix (e.g., 'ce_' or 'ce-' etc.) + */ + static String determinePrefixToUse(String targetProtocol) { + if (StringUtils.hasText(targetProtocol)) { + if (Protocols.KAFKA.equals(targetProtocol)) { + return CloudEventMessageUtils.KAFKA_ATTR_PREFIX; + } + else if (Protocols.AMQP.equals(targetProtocol)) { + return CloudEventMessageUtils.AMQP_ATTR_PREFIX; + } + else if (Protocols.HTTP.equals(targetProtocol)) { + return CloudEventMessageUtils.DEFAULT_ATTR_PREFIX; + } + } + return ""; + } + /** * Will check for the existence of required attributes. Assumes attributes (headers) * are in canonical form. * @param message input {@link Message} * @return true if this Message represents Cloud Event in binary-mode */ - static boolean isCloudEvent(Message message) { - return message.getHeaders().containsKey(SPECVERSION) - && message.getHeaders().containsKey(TYPE) - && message.getHeaders().containsKey(SOURCE); + public static boolean isCloudEvent(Message message) { + return (message.getHeaders().containsKey(SPECVERSION) + && message.getHeaders().containsKey(TYPE) + && message.getHeaders().containsKey(SOURCE)) + || + (message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SPECVERSION) + && message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _TYPE) + && message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SOURCE)) + || + (message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SPECVERSION) + && message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _TYPE) + && message.getHeaders().containsKey(KAFKA_ATTR_PREFIX + _SOURCE)); } private static boolean isAttribute(String key) { @@ -376,4 +393,13 @@ public final class CloudEventMessageUtils { } return (URI) uri; } + + public static class Protocols { + static String AMQP = "amqp"; + static String AVRO = "avro"; + static String HTTP = "http"; + static String JSON = "json"; + static String KAFKA = "kafka"; + } + } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java index b491db98f..778c17216 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventsFunctionInvocationHelper.java @@ -30,7 +30,6 @@ import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; /** @@ -59,6 +58,7 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper preProcessInput(Message input, Object inputConverter) { + // TODO find a way to invoke it conditionally. May be check for certain headers with all known prefixes as well as content type try { return CloudEventMessageUtils.toCanonical(input, (MessageConverter) inputConverter); } @@ -68,32 +68,15 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper postProcessResult(Message input, Object result) { - Message resultMessage = result instanceof Message ? (Message) result : null; - if (CloudEventMessageUtils.isCloudEvent(input)) { - CloudEventMessageBuilder messageBuilder; - if (result instanceof Message) { - messageBuilder = CloudEventMessageBuilder.fromMessage((Message) result); - } - else { - messageBuilder = CloudEventMessageBuilder - .withData(result) - .setId(UUID.randomUUID().toString()) - .setSource(URI.create("http://spring.io/" + getApplicationName())) - .setType(result.getClass().getName()); - } + public Message postProcessResult(Object result, Message input) { + String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(input.getHeaders()); + return this.doPostProcessResult(result, targetPrefix); + } - if (this.cloudEventAttributesProvider != null) { - messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder); - } - - resultMessage = messageBuilder.build(CloudEventMessageUtils.determinePrefixToUse(input.getHeaders())); - } - else if (!(result instanceof Message)) { - resultMessage = MessageBuilder.withPayload(result).build(); - } - - return resultMessage; + @Override + public Message postProcessResult(Object result, String targetProtocol) { + String targetPrefix = CloudEventMessageUtils.determinePrefixToUse(targetProtocol); + return this.doPostProcessResult(result, targetPrefix); } @Override @@ -101,6 +84,27 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper doPostProcessResult(Object result, String targetPrefix) { + Message resultMessage = null; //result instanceof Message ? (Message) result : null; + CloudEventMessageBuilder messageBuilder; + if (result instanceof Message) { + messageBuilder = CloudEventMessageBuilder.fromMessage((Message) result); + } + else { + messageBuilder = CloudEventMessageBuilder + .withData(result) + .setId(UUID.randomUUID().toString()) + .setSource(URI.create("http://spring.io/" + getApplicationName())) + .setType(result.getClass().getName()); + } + + if (this.cloudEventAttributesProvider != null) { + messageBuilder = this.cloudEventAttributesProvider.enrich(messageBuilder); + } + + resultMessage = messageBuilder.build(targetPrefix); + return resultMessage; + } private String getApplicationName() { ConfigurableEnvironment environment = this.applicationContext.getEnvironment(); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 6d953fe71..e46900f51 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -45,6 +46,7 @@ import reactor.util.function.Tuples; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; @@ -621,13 +623,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect Map headersMap = (Map) ReflectionUtils .getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders()); this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v)); - if (functionInvocationHelper != null) { - result = functionInvocationHelper.postProcessResult((Message) input, result); + if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) { + result = functionInvocationHelper.postProcessResult(result, (Message) input); } } else { - if (functionInvocationHelper != null) { - result = functionInvocationHelper.postProcessResult((Message) input, result); + if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) { + result = functionInvocationHelper.postProcessResult(result, (Message) input); } else { result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); @@ -694,12 +696,24 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @SuppressWarnings("unchecked") private Object invokeFunctionAndEnrichResultIfNecessary(Object value) { + AtomicReference> firstInputMessage = new AtomicReference<>(); + Object inputValue; if (value instanceof Flux) { - inputValue = ((Flux) value).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v)); + inputValue = ((Flux) value).map(v -> { + if (v instanceof OriginalMessageHolder && firstInputMessage.get() == null) { + firstInputMessage.set(((OriginalMessageHolder) v).getOriginalMessage()); + } + return this.extractValueFromOriginalValueHolderIfNecessary(v); + }); } else if (value instanceof Mono) { - inputValue = ((Mono) value).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v)); + inputValue = ((Mono) value).map(v -> { + if (v instanceof OriginalMessageHolder) { + firstInputMessage.set(((OriginalMessageHolder) v).getOriginalMessage()); + } + return this.extractValueFromOriginalValueHolderIfNecessary(v); + }); } else { inputValue = this.extractValueFromOriginalValueHolderIfNecessary(value); @@ -710,6 +724,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } Object result = ((Function) this.target).apply(inputValue); + if (result instanceof Flux && functionInvocationHelper != null) { + result = ((Flux) result).map(v -> { + if (firstInputMessage.get() != null && CloudEventMessageUtils.isCloudEvent(firstInputMessage.get())) { + return functionInvocationHelper.postProcessResult(v, firstInputMessage.get()); + } + return v; + }); + } + return value instanceof OriginalMessageHolder ? this.enrichInvocationResultIfNecessary(((OriginalMessageHolder) value).getOriginalMessage(), result) : result; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java index 0c768efbc..09b336c1e 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventFunctionTests.java @@ -22,10 +22,12 @@ import java.util.UUID; import java.util.function.Function; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -33,6 +35,8 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; + + import static org.assertj.core.api.Assertions.assertThat; /** @@ -71,6 +75,86 @@ public class CloudEventFunctionTests { assertThat(CloudEventMessageUtils.getSource(resultMessage)).isEqualTo(URI.create("http://spring.io/application-application")); } + /* + * Aside from the properly processing and recognizing CE, the following tow tests (imperative and reactive) + * also emulate message coming from one protocol going to another via MessageUtils.TARGET_PROTOCOL header that + * is set here explicitly but for instance in s-c-stream is set by the framework + */ + @Test + public void testBinaryPojoToPojoDefaultOutputHeaderProviderImperative() { + Function function = this.lookup("springRelease", TestConfiguration.class); + + String id = UUID.randomUUID().toString(); + + String payload = "{\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }"; + + Message inputMessage = CloudEventMessageBuilder + .withData(payload) + .setId(id) + .setSource("https://spring.io/") + .setType("org.springframework") + .setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA) + .build(CloudEventMessageUtils.AMQP_ATTR_PREFIX); + + assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue(); + + Message message = (Message) function.apply(inputMessage); + + /* + * Validates that although user only deals with POJO, the framework recognizes + * both on input and output that it is dealing with Cloud Event and generates + * appropriate headers/attributes + */ + + assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue(); + assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName()); + assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/application-application")); + assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/application-application")); + } + + @SuppressWarnings("unchecked") + @Test + public void testBinaryPojoToPojoDefaultOutputHeaderProviderReactive() { + Function function = this.lookup("springReleaseReactive", TestConfiguration.class); + + String id = UUID.randomUUID().toString(); + + String payload = "{\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }"; + + Message inputMessage = CloudEventMessageBuilder + .withData(payload) + .setId(id) + .setSource("https://spring.io/") + .setType("org.springframework") + .setHeader(MessageUtils.TARGET_PROTOCOL, CloudEventMessageUtils.Protocols.KAFKA) + .build(CloudEventMessageUtils.AMQP_ATTR_PREFIX); + + assertThat(CloudEventMessageUtils.isCloudEvent(inputMessage)).isTrue(); + + Message message = ((Flux>) function.apply(Flux.just(inputMessage))).blockFirst(); + + /* + * Validates that although user only deals with POJO, the framework recognizes + * both on input and output that it is dealing with Cloud Event and generates + * appropriate headers/attributes + */ + + assertThat(CloudEventMessageUtils.isCloudEvent(message)).isTrue(); + assertThat(CloudEventMessageUtils.getType(message)).isEqualTo(SpringReleaseEvent.class.getName()); + assertThat(CloudEventMessageUtils.getSource(message)).isEqualTo(URI.create("http://spring.io/application-application")); + assertThat(message.getHeaders().get("ce_source")).isEqualTo(URI.create("http://spring.io/application-application")); + } + + + // this kind of emulates that message came from Kafka @SuppressWarnings("unchecked") @Test @@ -238,6 +322,20 @@ public class CloudEventFunctionTests { }; } + @Bean + Function, Flux> springReleaseReactive() { + return flux -> flux.map(event -> { + try { + event.setReleaseDate(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006")); + event.setVersion("2.0"); + return event; + } + catch (Exception e) { + throw new IllegalArgumentException(e); + } + }); + } + @Bean Function, Message> springReleaseAsMessage() { return message -> { diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java index 35d49adb1..37a2444ee 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionInvocationHelper.java @@ -30,5 +30,7 @@ public interface FunctionInvocationHelper { I preProcessInput(I input, Object inputConverter); - I postProcessResult(I input, Object result); + I postProcessResult(Object result, String hint); + + I postProcessResult(Object result, I input); }