diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAtttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAtttributesProvider.java index de8986ef4..e3cb98726 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAtttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAtttributesProvider.java @@ -16,6 +16,10 @@ package org.springframework.cloud.function.cloudevent; +import java.util.Collections; +import java.util.Map; + +import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** @@ -57,4 +61,12 @@ public interface CloudEventAtttributesProvider { * @return modifiable instance of {@link CloudEventAttributes} */ RequiredAttributeAccessor get(MessageHeaders headers); + + /** + * + * @param inputMessage input message used to invoke user functionality (e.g., function) + * @param result result of the invocation of user functionality (e.g., function) + * @return instance of {@link CloudEventAttributes} + */ + Map generateDefaultCloudEventHeaders(Message inputMessage, Object result); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java index d01451065..61b2d6c58 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java @@ -16,12 +16,20 @@ package org.springframework.cloud.function.cloudevent; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * @@ -29,10 +37,9 @@ import org.springframework.util.Assert; * @since 3.1 * */ -public class DefaultCloudEventAttributesProvider implements CloudEventAtttributesProvider { - /* - * should i provide instance() method for convinience or should it be always injected into function - */ +public class DefaultCloudEventAttributesProvider implements CloudEventAtttributesProvider, ApplicationContextAware { + + private ConfigurableApplicationContext applicationContext; @Override public CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) { @@ -61,4 +68,26 @@ public class DefaultCloudEventAttributesProvider implements CloudEventAtttribute return new RequiredAttributeAccessor(headers); } + @Override + public Map generateDefaultCloudEventHeaders(Message inputMessage, Object result) { + if (inputMessage.getHeaders().containsKey(CloudEventMessageUtils.CE_ID)) { // input is a cloud event + String applicationName = this.getApplicationName(); + return this.get(inputMessage.getHeaders()) + .setId(UUID.randomUUID().toString()) + .setType(result.getClass().getName()) + .setSource(applicationName); + } + return Collections.emptyMap(); + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = (ConfigurableApplicationContext) applicationContext; + } + + private String getApplicationName() { + ConfigurableEnvironment environment = this.applicationContext.getEnvironment(); + String name = environment.getProperty("spring.application.name"); + return "http://spring.io/" + (StringUtils.hasText(name) ? name : "application-" + this.applicationContext.getId()); + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 69aebe147..26094f4ec 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -20,6 +20,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Type; import java.util.Arrays; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -30,7 +31,9 @@ import org.aopalliance.intercept.MethodInvocation; import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; +import org.springframework.cloud.function.cloudevent.CloudEventAtttributesProvider; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; @@ -39,7 +42,9 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.convert.ConversionService; +import org.springframework.messaging.Message; import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StringUtils; /** @@ -51,6 +56,9 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp private GenericApplicationContext applicationContext; + @Autowired(required = false) + private CloudEventAtttributesProvider cloudEventAtttributesProvider; + public BeanFactoryAwareFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) { super(conversionService, messageConverter, jsonMapper); @@ -150,9 +158,26 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes); } + if (function != null && this.cloudEventAtttributesProvider != null) { + BiFunction, Object, Message> invocationResultHeaderEnricher = new BiFunction, Object, Message>() { + + @Override + public Message apply(Message inputMessage, Object invocationResult) { + Message message = MessageBuilder.withPayload(invocationResult).copyHeaders( + cloudEventAtttributesProvider.generateDefaultCloudEventHeaders(inputMessage, invocationResult)) + .build(); + + return message; + } + }; + function.setOutputMessageHeaderEnricher(invocationResultHeaderEnricher); + } + return (T) function; } + + private Object discoverFunctionInBeanFactory(String functionName) { Object functionCandidate = null; if (this.applicationContext.containsBean(functionName)) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index d95e0b48d..51763d4b5 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -320,6 +321,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ private Function enhancer; + private BiFunction, Object, Message> outputMessageHeaderEnricher; + + void setOutputMessageHeaderEnricher(BiFunction, Object, Message> outputMessageHeaderEnricher) { + this.outputMessageHeaderEnricher = outputMessageHeaderEnricher; + } + FunctionInvocationWrapper(FunctionInvocationWrapper function) { this.target = function.target; this.inputType = function.inputType; @@ -615,7 +622,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v)); } else { - result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); + if (this.outputMessageHeaderEnricher != null) { + result = this.outputMessageHeaderEnricher.apply((Message) input, result); + } + else { + result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); + } } } return result; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 5418c896a..6c65ce8ed 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -75,6 +75,7 @@ public class ContextFunctionCatalogAutoConfiguration { static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper"; @Bean + @ConditionalOnMissingBean public CloudEventAtttributesProvider cloudEventAttributesProvider() { return new DefaultCloudEventAttributesProvider(); } diff --git a/spring-cloud-function-samples/function-sample-azure/pom.xml b/spring-cloud-function-samples/function-sample-azure/pom.xml index 29273cef5..1ba422bc7 100644 --- a/spring-cloud-function-samples/function-sample-azure/pom.xml +++ b/spring-cloud-function-samples/function-sample-azure/pom.xml @@ -54,7 +54,7 @@ org.springframework.cloud spring-cloud-function-dependencies - 3.1.0.BUILD-SNAPSHOT + 3.1.0-SNAPSHOT pom import diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index 02b0964f1..96d853e3a 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -90,29 +90,21 @@ public class CloudeventDemoApplication { data.setReleaseDateAsString("01-10-2006"); CloudEventAttributes ceAttributes = ceAttrProvider.get(ceMessage.getHeaders()) - .setSource("https://interface21.icom/") + .setSource("https://interface21.com/") .setType("com.interface21"); return MessageBuilder.withPayload(data).copyHeaders(ceAttributes).build(); }; } -// // spring.io/applicationName -// -// @Bean -// public Function, SpringReleaseEvent> consumeAndProduceCloudEvent() { -// return ceMessage -> { -// SpringReleaseEvent data = ceMessage.getPayload(); -// data.setVersion("2.0"); -// data.setReleaseDateAsString("01-10-2006"); -// -// CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); -// -// CloudEventAttributes ceAttributes = ceAttrProvider.get(ceMessage.getHeaders()) -// .setSource("https://interface21.icom/") -// .setType("com.interface21"); -// -// return MessageBuilder.withPayload(data).copyHeaders(ceAttributes).build(); -// }; -// } + @Bean + public Function, SpringReleaseEvent> consumeAndProduceCloudEventPojo() { + return ceMessage -> { + SpringReleaseEvent data = ceMessage.getPayload(); + data.setVersion("2.0"); + data.setReleaseDateAsString("01-10-2006"); + + return data; + }; + } } diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java index c20062e19..4606012cd 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java @@ -50,16 +50,16 @@ public class CloudeventDemoApplicationFunctionTests { * is (see `asPOJOMessage` and `asPOJO` specifically). Type conversion will happen * inside spring-cloud-function. */ - Function, String> asPojoMessage = catalog.lookup("asPOJOMessage"); + Function, Message> asPojoMessage = catalog.lookup("asPOJOMessage"); System.out.println(asPojoMessage.apply(binaryCloudEventMessage)); - Function, String> asPojo = catalog.lookup("asPOJO"); + Function, Message> asPojo = catalog.lookup("asPOJO"); System.out.println(asPojo.apply(binaryCloudEventMessage)); - Function, String> asString = catalog.lookup("asString"); + Function, Message> asString = catalog.lookup("asString"); System.out.println(asString.apply(binaryCloudEventMessage)); - Function, String> asStringMessage = catalog.lookup("asStringMessage"); + Function, Message> asStringMessage = catalog.lookup("asStringMessage"); System.out.println(asStringMessage.apply(binaryCloudEventMessage)); } } @@ -83,4 +83,24 @@ public class CloudeventDemoApplicationFunctionTests { System.out.println(asPojoMessage.apply(binaryCloudEventMessage)); } } + + @Test + public void demoPureFunctionProduceConsumeCloudEventAsPojo() { + try(ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { + FunctionCatalog catalog = context.getBean(FunctionCatalog.class); + CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + Message binaryCloudEventMessage = MessageBuilder + .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") + .copyHeaders(ceAttrProvider.get("spring.io/spring-event", "com.example.springevent")) + .build(); + + /* + * NOTE how it makes no difference what the actual function signature + * is (see `asPOJOMessage` and `asPOJO` specifically). Type conversion will happen + * inside spring-cloud-function. + */ + Function, Message> asPojoMessage = catalog.lookup("consumeAndProduceCloudEventPojo"); + System.out.println(asPojoMessage.apply(binaryCloudEventMessage)); + } + } }