diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAtttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java similarity index 98% rename from spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAtttributesProvider.java rename to spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java index 152a0c0a9..3ad1a9395 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAtttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventAttributesProvider.java @@ -26,7 +26,7 @@ import org.springframework.messaging.MessageHeaders; * @author Oleg Zhurakousky * @since 3.1 */ -public interface CloudEventAtttributesProvider { +public interface CloudEventAttributesProvider { /** * Will construct instance of {@link CloudEventAttributes} setting its required attributes. diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java index 5ff1d60ea..531da48c9 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventDataContentTypeMessagePreProcessor.java @@ -90,7 +90,11 @@ public class CloudEventDataContentTypeMessagePreProcessor implements Function buildCeMessageFromStructured(Map structuredCloudEvent) { - MessageBuilder builder = MessageBuilder.withPayload(structuredCloudEvent.get(CloudEventMessageUtils.DATA)); + MessageBuilder builder = MessageBuilder.withPayload( + structuredCloudEvent.containsKey(CloudEventMessageUtils.CE_DATA) + ? structuredCloudEvent.get(CloudEventMessageUtils.CE_DATA) + : structuredCloudEvent.get(CloudEventMessageUtils.DATA)); + structuredCloudEvent.remove(CloudEventMessageUtils.CE_DATA); structuredCloudEvent.remove(CloudEventMessageUtils.DATA); builder.copyHeaders(structuredCloudEvent); return builder.build(); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index b92de95d3..c5cbf8e61 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -49,7 +49,7 @@ public final class CloudEventMessageUtils { /** * Prefix for attributes. */ - public static String ATTR_PREFIX = "ce_"; + public static String ATTR_PREFIX = "ce-"; /** * Value for 'data' attribute. diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java index 61b2d6c58..d1ba9430e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/DefaultCloudEventAttributesProvider.java @@ -37,7 +37,7 @@ import org.springframework.util.StringUtils; * @since 3.1 * */ -public class DefaultCloudEventAttributesProvider implements CloudEventAtttributesProvider, ApplicationContextAware { +public class DefaultCloudEventAttributesProvider implements CloudEventAttributesProvider, ApplicationContextAware { private ConfigurableApplicationContext applicationContext; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 26094f4ec..a5979ae64 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -33,7 +33,7 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; -import org.springframework.cloud.function.cloudevent.CloudEventAtttributesProvider; +import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; @@ -57,7 +57,7 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp private GenericApplicationContext applicationContext; @Autowired(required = false) - private CloudEventAtttributesProvider cloudEventAtttributesProvider; + private CloudEventAttributesProvider cloudEventAtttributesProvider; public BeanFactoryAwareFunctionRegistry(ConversionService conversionService, CompositeMessageConverter messageConverter, JsonMapper jsonMapper) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 51763d4b5..d4351fdd1 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -866,7 +866,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect output = enhancer.apply(output); } - if (ObjectUtils.isEmpty(contentType)) { + if (ObjectUtils.isEmpty(contentType) && !(output instanceof Publisher)) { return output; } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 6c65ce8ed..5a8335184 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -30,7 +30,7 @@ import com.google.gson.Gson; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.cloud.function.cloudevent.CloudEventAtttributesProvider; +import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; import org.springframework.cloud.function.cloudevent.CloudEventDataContentTypeMessagePreProcessor; import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter; import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider; @@ -76,7 +76,7 @@ public class ContextFunctionCatalogAutoConfiguration { @Bean @ConditionalOnMissingBean - public CloudEventAtttributesProvider cloudEventAttributesProvider() { + public CloudEventAttributesProvider cloudEventAttributesProvider() { return new DefaultCloudEventAttributesProvider(); } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java index 88df74cd2..55c04d649 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventTypeConversionTests.java @@ -45,7 +45,7 @@ public class CloudEventTypeConversionTests { @Test public void testFromMessageBinaryPayloadMatchesType() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + CloudEventAttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); CloudEventAttributes ceAttributes = ceAttrProvider .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); ceAttributes.setDataContentType("text/plain"); @@ -58,7 +58,7 @@ public class CloudEventTypeConversionTests { @Test public void testFromMessageBinaryPayloadDoesNotMatchType() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + CloudEventAttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); CloudEventAttributes ceAttributes = ceAttrProvider .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) @@ -74,7 +74,7 @@ public class CloudEventTypeConversionTests { // this works public void testFromMessageBinaryPayloadNoDataContentTypeToString() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + CloudEventAttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); CloudEventAttributes ceAttributes = ceAttrProvider .get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) @@ -89,7 +89,7 @@ public class CloudEventTypeConversionTests { @Test // Unlike the previous test the type here is POJO so no special treatement public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + CloudEventAttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); CloudEventAttributes ceAttributes = ceAttrProvider.get("https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("Hello Ricky".getBytes()) .copyHeaders(ceAttributes) @@ -103,7 +103,7 @@ public class CloudEventTypeConversionTests { @Test // will fall on default CT which is json public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() { SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class); - CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + CloudEventAttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); CloudEventAttributes ceAttributes = ceAttrProvider.get("https://spring.io/", "org.springframework"); Message message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes()) .copyHeaders(ceAttributes) diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index 96d853e3a..b2868a10a 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -16,13 +16,18 @@ package io.spring.cloudevent; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; import java.util.function.Function; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.function.cloudevent.CloudEventAttributes; -import org.springframework.cloud.function.cloudevent.CloudEventAtttributesProvider; +import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; +import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils; import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -83,7 +88,7 @@ public class CloudeventDemoApplication { } @Bean - public Function, Message> consumeAndProduceCloudEvent(CloudEventAtttributesProvider ceAttrProvider) { + public Function, Message> consumeAndProduceCloudEvent(CloudEventAttributesProvider ceAttrProvider) { return ceMessage -> { SpringReleaseEvent data = ceMessage.getPayload(); data.setVersion("2.0"); @@ -107,4 +112,24 @@ public class CloudeventDemoApplication { return data; }; } + + @Bean + public CloudEventAttributesProvider cloudEventAttributesProvider() { + return new CustomCloudEventAtttributesProvider(); + } + + public static class CustomCloudEventAtttributesProvider extends DefaultCloudEventAttributesProvider { + + @Override + public Map generateDefaultCloudEventHeaders(Message inputMessage, Object result) { + if (inputMessage.getHeaders().containsKey(CloudEventMessageUtils.CE_ID)) { // input is a cloud event + String applicationName = "http://spring.io/fooBar"; + return this.get(inputMessage.getHeaders()) + .setId(UUID.randomUUID().toString()) + .setType(result.getClass().getName()) + .setSource(applicationName); + } + return Collections.emptyMap(); + } + } } diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/resources/application.properties b/spring-cloud-function-samples/function-sample-cloudevent/src/main/resources/application.properties index b1accb57d..2d0521080 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/resources/application.properties +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/resources/application.properties @@ -1 +1 @@ -spring.cloud.function.definition=asPOJOMessage +#spring.cloud.function.definition=asPOJOMessage diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java index 4606012cd..2e46b5ef9 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationFunctionTests.java @@ -20,7 +20,7 @@ import java.util.function.Function; import org.junit.jupiter.api.Test; import org.springframework.boot.SpringApplication; -import org.springframework.cloud.function.cloudevent.CloudEventAtttributesProvider; +import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider; import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.context.ConfigurableApplicationContext; @@ -39,7 +39,7 @@ public class CloudeventDemoApplicationFunctionTests { try(ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + CloudEventAttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); Message binaryCloudEventMessage = MessageBuilder .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") .copyHeaders(ceAttrProvider.get("spring.io/spring-event", "com.example.springevent")) @@ -68,7 +68,7 @@ public class CloudeventDemoApplicationFunctionTests { public void demoPureFunctionProduceConsumeCloudEvent() { try(ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + CloudEventAttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); Message binaryCloudEventMessage = MessageBuilder .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") .copyHeaders(ceAttrProvider.get("spring.io/spring-event", "com.example.springevent")) @@ -88,7 +88,7 @@ public class CloudeventDemoApplicationFunctionTests { public void demoPureFunctionProduceConsumeCloudEventAsPojo() { try(ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - CloudEventAtttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); + CloudEventAttributesProvider ceAttrProvider = new DefaultCloudEventAttributesProvider(); Message binaryCloudEventMessage = MessageBuilder .withPayload("{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}") .copyHeaders(ceAttrProvider.get("spring.io/spring-event", "com.example.springevent")) diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index 1303158fd..31488a1c6 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -174,12 +174,12 @@ public class CloudeventDemoApplicationRESTTests { SpringApplication.run(CloudeventDemoApplication.class); String payload = "{\n" + - " \"specversion\" : \"1.0\",\n" + - " \"type\" : \"org.springframework\",\n" + - " \"source\" : \"https://spring.io/\",\n" + - " \"id\" : \"A234-1234-1234\",\n" + - " \"datacontenttype\" : \"application/json\",\n" + - " \"data\" : {\n" + + " \"ce-specversion\" : \"1.0\",\n" + + " \"ce-type\" : \"org.springframework\",\n" + + " \"ce-source\" : \"https://spring.io/\",\n" + + " \"ce-id\" : \"A234-1234-1234\",\n" + + " \"ce-datacontenttype\" : \"application/json\",\n" + + " \"ce-data\" : {\n" + " \"version\" : \"1.0\",\n" + " \"releaseName\" : \"Spring Framework\",\n" + " \"releaseDate\" : \"24-03-2004\"\n" + diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java index a4d1eef11..822f97530 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java @@ -79,7 +79,9 @@ public class FunctionController { Publisher result = (Publisher) function.apply(Flux.fromIterable(files)); BodyBuilder builder = ResponseEntity.ok(); if (result instanceof Flux) { - result = Flux.from(result).map(message -> ((Message) message).getPayload()).collectList(); + result = Flux.from(result).map(message -> { + return message instanceof Message ? ((Message) message).getPayload() : message; + }).collectList(); } return Mono.from(result).flatMap(body -> Mono.just(builder.body(body))); }