diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml index 9ba9ca21b..6027800d5 100644 --- a/spring-cloud-function-context/pom.xml +++ b/spring-cloud-function-context/pom.xml @@ -86,6 +86,12 @@ kotlinx-coroutines-reactor true + + io.cloudevents + cloudevents-spring + 2.2.0 + true + diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 5cd9f04eb..24826f5a9 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -234,6 +234,9 @@ public final class CloudEventMessageUtils { static Message toCanonical(Message inputMessage, MessageConverter messageConverter) { Map headers = (Map) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders()); canonicalizeHeaders(headers, false); + if (isCloudEvent(inputMessage) && headers.containsKey("content-type")) { + inputMessage = MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, headers.get("content-type")).build(); + } String inputContentType = (String) inputMessage.getHeaders().get(DATACONTENTTYPE); // first check the obvious and see if content-type is `cloudevents` diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index ca2d437b8..03ff5658e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; +import io.cloudevents.spring.messaging.CloudEventMessageConverter; import org.springframework.beans.factory.BeanFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -129,6 +130,13 @@ public class ContextFunctionCatalogAutoConfiguration { return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionProperties, functionInvocationHelper); } + @Bean + @ConditionalOnMissingBean + @ConditionalOnClass(name = "io.cloudevents.spring.messaging.CloudEventMessageConverter") + public MessageConverter cloudEventMessageConverter() { + return new CloudEventMessageConverter(); + } + @Bean(RoutingFunction.FUNCTION_NAME) RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionProperties functionProperties, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { diff --git a/spring-cloud-function-deployer/pom.xml b/spring-cloud-function-deployer/pom.xml index c0ce8f75a..f7017b1e1 100644 --- a/spring-cloud-function-deployer/pom.xml +++ b/spring-cloud-function-deployer/pom.xml @@ -45,6 +45,12 @@ spring-boot-configuration-processor true + + io.cloudevents + cloudevents-spring + 2.2.0 + test + diff --git a/spring-cloud-function-deployer/src/it/simplestjar/pom.xml b/spring-cloud-function-deployer/src/it/simplestjar/pom.xml index 6c8bebcb3..8eaa026d9 100644 --- a/spring-cloud-function-deployer/src/it/simplestjar/pom.xml +++ b/spring-cloud-function-deployer/src/it/simplestjar/pom.xml @@ -25,8 +25,30 @@ 1.8 + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + + + + io.cloudevents + cloudevents-api + 2.2.0 + true + + diff --git a/spring-cloud-function-deployer/src/it/simplestjar/src/main/java/function/example/EchoCloudEventFunction.java b/spring-cloud-function-deployer/src/it/simplestjar/src/main/java/function/example/EchoCloudEventFunction.java new file mode 100644 index 000000000..0dc913143 --- /dev/null +++ b/spring-cloud-function-deployer/src/it/simplestjar/src/main/java/function/example/EchoCloudEventFunction.java @@ -0,0 +1,16 @@ +package function.example; + +import java.util.Map; +import java.util.function.Function; + +import io.cloudevents.CloudEvent; + +public class EchoCloudEventFunction implements Function { + + @Override + public CloudEvent apply(CloudEvent value) { + System.out.println("Received " + value); + return value; + } + +} diff --git a/spring-cloud-function-deployer/src/it/simplestjar/src/main/java/function/example/UpperCaseFunction.java b/spring-cloud-function-deployer/src/it/simplestjar/src/main/java/function/example/UpperCaseFunction.java deleted file mode 100644 index 859a54a58..000000000 --- a/spring-cloud-function-deployer/src/it/simplestjar/src/main/java/function/example/UpperCaseFunction.java +++ /dev/null @@ -1,13 +0,0 @@ -package function.example; - -import java.util.function.Function; - -public class UpperCaseFunction implements Function { - - @Override - public String apply(String value) { - System.out.println("Uppercasing " + value); - return value.toUpperCase(); - } - -} diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java index dd69a401e..730c0b182 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionArchiveDeployer.java @@ -209,6 +209,7 @@ class FunctionArchiveDeployer extends JarLauncher { private boolean shouldLoadViaDeployerLoader(String name) { return name.startsWith("org.reactivestreams") || name.startsWith("reactor.") + || name.startsWith("io.cloudevents") || name.startsWith("org.springframework.messaging.Message") || name.startsWith("org.springframework.messaging.converter.MessageConverter"); } @@ -226,9 +227,6 @@ class FunctionArchiveDeployer extends JarLauncher { else { return null; } -// return StringUtils.hasText(functionProperties.getFunctionClass()) -// ? functionProperties.getFunctionClass().split(";") -// : new String[] {this.getArchive().getManifest().getMainAttributes().getValue("Function-Class")}; } catch (Exception e) { throw new IllegalStateException("Failed to discover function class name", e); diff --git a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java index 2bc6e341e..c9ae26965 100644 --- a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java +++ b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java @@ -33,10 +33,12 @@ import reactor.util.function.Tuples; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.deployer.resource.maven.MavenProperties; +import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -107,20 +109,22 @@ public class FunctionDeployerTests { public void testWithSimplestJar() throws Exception { String[] args = new String[] { "--spring.cloud.function.location=target/it/simplestjar/target/simplestjar-1.0.0.RELEASE.jar", - "--spring.cloud.function.function-class=function.example.UpperCaseFunction" }; + "--spring.cloud.function.function-class=function.example.EchoCloudEventFunction" }; ApplicationContext context = SpringApplication.run(DeployerApplication.class, args); FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - Function function = catalog.lookup("upperCaseFunction"); + Function, Message> function = catalog.lookup("echoCloudEventFunction"); - assertThat(function.apply("bob")).isEqualTo("BOB"); - assertThat(function.apply("stacy")).isEqualTo("STACY"); + String data = "{\"name\":\"Ricky\"}"; + Message inputMessage = CloudEventMessageBuilder + .withData(data) + .setId("123") + .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") + .setSource("https://spring.io/") + .setType("org.springframework") + .build(); - Function, Flux> functionAsFlux = catalog.lookup("upperCaseFunction"); - - List results = functionAsFlux.apply(Flux.just("bob", "stacy")).collectList().block(); - assertThat(results.get(0)).isEqualTo("BOB"); - assertThat(results.get(1)).isEqualTo("STACY"); + assertThat(new String(function.apply(inputMessage).getPayload())).isEqualTo(data); } @Test @@ -146,20 +150,28 @@ public class FunctionDeployerTests { public void testWithSimplestJarExploaded() throws Exception { String[] args = new String[] { "--spring.cloud.function.location=target/it/simplestjar/target/classes", - "--spring.cloud.function.function-class=function.example.UpperCaseFunction" }; + "--spring.cloud.function.function-class=function.example.EchoCloudEventFunction" }; ApplicationContext context = SpringApplication.run(DeployerApplication.class, args); FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - Function function = catalog.lookup("upperCaseFunction"); + Function, Message> function = catalog.lookup("echoCloudEventFunction"); - assertThat(function.apply("bob")).isEqualTo("BOB"); - assertThat(function.apply("stacy")).isEqualTo("STACY"); + String data = "{\"name\":\"Ricky\"}"; + Message inputMessage = CloudEventMessageBuilder + .withData(data) + .setId("123") + .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") + .setSource("https://spring.io/") + .setType("org.springframework") + .build(); - Function, Flux> functionAsFlux = catalog.lookup("upperCaseFunction"); + assertThat(new String(function.apply(inputMessage).getPayload())).isEqualTo(data); - List results = functionAsFlux.apply(Flux.just("bob", "stacy")).collectList().block(); - assertThat(results.get(0)).isEqualTo("BOB"); - assertThat(results.get(1)).isEqualTo("STACY"); + Function>, Flux>> functionAsFlux = catalog.lookup("echoCloudEventFunction"); + + List> results = functionAsFlux.apply(Flux.just(inputMessage)).collectList().block(); + assertThat(results.get(0).getPayload()).isEqualTo(data.getBytes()); + //assertThat(results.get(1)).isEqualTo("STACY"); } /*