diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index c07e71d9c..7d69c4e9a 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -16,12 +16,14 @@ package org.springframework.cloud.function.context.catalog; +import java.lang.reflect.Array; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.lang.reflect.WildcardType; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -170,7 +172,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect function = this.compose(type, functionDefinition); } - if (function != null) { + if (function != null && !ObjectUtils.isEmpty(expectedOutputMimeTypes)) { function.expectedOutputContentType = expectedOutputMimeTypes; } else if (logger.isDebugEnabled()) { @@ -804,6 +806,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect : new OriginalMessageHolder(((Message) input).getPayload(), (Message) input); } else if (input instanceof Message) { + if (((Message) input).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull") + && !this.isInputTypeMessage()) { //TODO rework + return null; + } convertedInput = this.convertInputMessageIfNecessary((Message) input, type); if (convertedInput == null) { // give ConversionService a chance convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false); @@ -866,7 +872,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect else if (output instanceof Collection && this.isOutputTypeMessage()) { convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType); } - else if (!ObjectUtils.isEmpty(contentType)) { + else if (ObjectUtils.isArray(output) && !(output instanceof byte[])) { + convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType); + } + else { convertedOutput = messageConverter.toMessage(output, new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType[0]))); } @@ -1043,14 +1052,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @SuppressWarnings("unchecked") private Object convertMultipleOutputValuesIfNecessary(Object output, String[] contentType) { - Collection outputCollection = (Collection) output; - Collection convertedOutputCollection = output instanceof List ? new ArrayList<>() : new TreeSet<>(); + Collection outputCollection = ObjectUtils.isArray(output) ? Arrays.asList(output) : (Collection) output; + Collection convertedOutputCollection = outputCollection instanceof List ? new ArrayList<>() : new TreeSet<>(); + Type type = this.isOutputTypeMessage() ? FunctionTypeUtils.getGenericType(this.outputType) : this.outputType; for (Object outToConvert : outputCollection) { - Object result = this.convertOutputIfNecessary(outToConvert, this.outputType, contentType); - Assert.notNull(result, () -> "Failed to convert output '" + output + "'"); + Object result = this.convertOutputIfNecessary(outToConvert, type, contentType); + Assert.notNull(result, () -> "Failed to convert output '" + outToConvert + "'"); convertedOutputCollection.add(result); } - return convertedOutputCollection; + return ObjectUtils.isArray(output) ? convertedOutputCollection.toArray() : convertedOutputCollection; } /* diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java index 373e49678..73049b73c 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java @@ -17,13 +17,13 @@ package org.springframework.cloud.function.context.config; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Map; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.MimeType; @@ -55,7 +55,9 @@ public class CloudEventJsonMessageConverter extends JsonMessageConverter { return message.getPayload(); } Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint; - String jsonString = (String) message.getPayload(); + String jsonString = message.getPayload() instanceof String + ? (String) message.getPayload() + : new String((byte[]) message.getPayload(), StandardCharsets.UTF_8); Map mapEvent = this.mapper.fromJson(jsonString, Map.class); Object payload = this.mapper.fromJson(this.mapper.toJson(mapEvent.get("data")), convertToType); mapEvent.remove("data"); @@ -63,12 +65,6 @@ public class CloudEventJsonMessageConverter extends JsonMessageConverter { } } - @Override - protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, - @Nullable Object conversionHint) { - throw new UnsupportedOperationException("Temporarily not supported as this converter is work in progress"); - } - private boolean isBinary(Message message) { Map headers = message.getHeaders(); return headers.containsKey("source") && headers.containsKey("specversion") && headers.containsKey("type"); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java index b4ec238ff..3f0910bb2 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java @@ -62,11 +62,11 @@ public class BeanFactoryAwarePojoFunctionRegistryTests { Function, String> f2message = catalog.lookup("myFunction"); assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE"); - Function, Message> f2messageReturned = catalog.lookup("myFunction", "application/json"); - assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\""); - Function, Flux> f3 = catalog.lookup("myFunction"); assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO"); + + Function, Message> f2messageReturned = catalog.lookup("myFunction", "application/json"); + assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\""); } @Test @@ -85,11 +85,11 @@ public class BeanFactoryAwarePojoFunctionRegistryTests { Function, String> f2message = catalog.lookup("myFunctionLike"); assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE"); - Function, Message> f2messageReturned = catalog.lookup("myFunctionLike", "application/json"); - assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\""); - Function, Flux> f3 = catalog.lookup("myFunctionLike"); assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO"); + + Function, Message> f2messageReturned = catalog.lookup("myFunctionLike", "application/json"); + assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\""); } @Test diff --git a/spring-cloud-function-samples/function-sample-cloudevent/README.adoc b/spring-cloud-function-samples/function-sample-cloudevent/README.adoc index a9d65a584..8432a5c30 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/README.adoc +++ b/spring-cloud-function-samples/function-sample-cloudevent/README.adoc @@ -1,35 +1,37 @@ -## Cloud Events with Spring samples +## Examples of Cloud Events with Spring ### Introduction The current example uses spring-cloud-function framework as its core which allows users to only worry about functional aspects of their requirement while taking care-off non-functional aspects. For more information on Spring Cloud Function please visit our https://spring.io/projects/spring-cloud-function[project page]. -The example provides dependency and instructions to demonstrate several distinct invocation models: - - Direct function invocation - - Function as a REST endpoint - - Function as message handler (e.g., Kafka, RabbitMQ etc) - - Function invocation via RSocket +The example provides dependencies and instructions to demonstrate several distinct invocation models: + + - _Direct function invocation_ + - _Function as a REST endpoint_ + - _Function as message handler (e.g., Kafka, RabbitMQ etc)_ + - _Function invocation via RSocket_ The POM file defines all the necessary dependency in a segregated way, so you can choose the one you're interested in. -#### Direct function invocation +### Direct function invocation +TBD -#### Function as a REST endpoint +### Function as a REST endpoint Given that SCF allows function to be exposed as REST endpoints, you can post cloud event to any of the -functions by using function name as path (e.g., localhost:8080/) +functions by using function name as path (e.g., `localhost:8080/`) Here is an example of curl command posting a cloud event in binary-mode: [source, text] ---- curl -w'\n' localhost:8080/asPOJO \ - -H "ce-Specversion: 1.0" \ - -H "ce-Type: com.example.springevent" \ - -H "ce-Source: spring.io/spring-event" \ + -H "ce-specversion: 1.0" \ + -H "ce-type: com.example.springevent" \ + -H "ce-source: spring.io/spring-event" \ -H "Content-Type: application/json" \ - -H "ce-Id: 0001" \ + -H "ce-id: 0001" \ -d '{"releaseDate":"24-03-2004", "releaseName":"Spring Framework", "version":"1.0"}' ---- @@ -38,11 +40,7 @@ And here is an example of curl command posting a cloud event in structured-mode: [source, text] ---- curl -w'\n' localhost:8080/asString \ - -H "ce-Specversion: 1.0" \ - -H "ce-Type: com.example.springevent" \ - -H "ce-Source: spring.io/spring-event" \ -H "Content-Type: application/cloudevents+json" \ - -H "ce-Id: 0001" \ -d '{ "specversion" : "1.0", "type" : "org.springframework", @@ -57,15 +55,65 @@ curl -w'\n' localhost:8080/asString \ }' ---- -#### Function as message handler (e.g., Kafka, RabbitMQ etc) +### Function as message handler (e.g., Kafka, RabbitMQ etc) -Streaming support for Kafka and Rabbit is provided via Spring Cloud Stream framework (link). In fact we're only mentioning Kafka and Rabbit here as an example. -Streaming support is automatically provided for any existing binders (e.g., Solace, GCP, AWS etc) (link) -Binders are components of SCSt responsible to bind user code (e.g., function) to broker destinations so execution is triggered -by messages on broker destination and results of execution are sent to broker destinations. Binders also provide support consumer -groups and partitioning for both Kafka and RabbitMQ messaging systems. +Streaming support for Apache Kafka and RabbitMQ is provided via https://spring.io/projects/spring-cloud-stream[Spring Cloud Stream] framework. +In fact we're only mentioning Apache Kafka and RabbitMQ here as an example. +Streaming support is automatically provided for any existing binders (e.g., Solace, Google PubSub, Amazon Kinesis and many more). +Please see project page for for additional details on available binders. +Binders are components of Spring Cloud Stream responsible to bind user code (e.g., java function) to message broker destinations, so execution +is triggered by messages posted to the broker destination and results of execution are sent back to the broker destinations. Binders also provide +support for _consumer groups_, _partitioning_ and many other features. For more information on Spring Cloud Stream, Binders and available features +please visit our https://docs.spring.io/spring-cloud-stream/docs/3.1.0-SNAPSHOT/reference/html/[documentation page]. -#### Function invocation via RSocket +*RabbitMQ* +By simply declaring the following dependency +[source, xml] +---- + + org.springframework.cloud + spring-cloud-stream-binder-rabbit + 3.1.0-SNAPSHOT + +---- +. . . any function can now act as message handler bound to RabitMQ message broker. All you need to do is identify which function you intend to bind +by identifying it via `spring.cloud.function.definition` property. +[source, text] +---- +--spring.cloud.function.definition=asPOJOMessage +---- + +See link:src/main/resources/application.properties[application.properties] for more details. + +Assuming RabbitMQ broker is running on localhost:default_port, start the application and navigate to +http://localhost:15672/#/exchanges[RabbitMQ Dashboard]. Select `asPOJOMessage-in-0` exchange and: + +. . . post a binary-mode message by filling all the required Cloud Events headers and posting `data` element as _payload_ (see the screenshot below). + +image::images\rabbit-send-binary.png[] + +. . . post a structured-mode message by filling `contentType` header to the value of `application/cloudevents+json` while providing the +entire structure of Cloud Event message as _payload_ (see the screenshot below). + +[source, json] +---- +{ + "specversion" : "1.0", + "type" : "org.springframework", + "source" : "https://spring.io/", + "id" : "A234-1234-1234", + "datacontenttype" : "application/json", + "data" : { + "version" : "1.0", + "releaseName" : "Spring Framework", + "releaseDate" : "24-03-2004" + } +} +---- + +image::images\rabbit-send-structured.png[] + +### Function invocation via RSocket TBD \ No newline at end of file diff --git a/spring-cloud-function-samples/function-sample-cloudevent/images/rabbit-send-binary.png b/spring-cloud-function-samples/function-sample-cloudevent/images/rabbit-send-binary.png new file mode 100644 index 000000000..52bd15117 Binary files /dev/null and b/spring-cloud-function-samples/function-sample-cloudevent/images/rabbit-send-binary.png differ diff --git a/spring-cloud-function-samples/function-sample-cloudevent/images/rabbit-send-structured.png b/spring-cloud-function-samples/function-sample-cloudevent/images/rabbit-send-structured.png new file mode 100644 index 000000000..d5b45d3e7 Binary files /dev/null and b/spring-cloud-function-samples/function-sample-cloudevent/images/rabbit-send-structured.png differ diff --git a/spring-cloud-function-samples/function-sample-cloudevent/pom.xml b/spring-cloud-function-samples/function-sample-cloudevent/pom.xml index 52a3567cf..0c61fc292 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/pom.xml +++ b/spring-cloud-function-samples/function-sample-cloudevent/pom.xml @@ -45,11 +45,11 @@ - - - - - + + org.springframework.cloud + spring-cloud-stream-binder-rabbit + 3.1.0-SNAPSHOT + diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/resources/application.properties b/spring-cloud-function-samples/function-sample-cloudevent/src/main/resources/application.properties index 8b1378917..b1accb57d 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/resources/application.properties +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/resources/application.properties @@ -1 +1 @@ - +spring.cloud.function.definition=asPOJOMessage diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java index 1815e29b8..f75c8d2da 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java @@ -152,7 +152,7 @@ public class CloudeventDemoApplicationRESTTests { " \"releaseDate\" : \"24-03-2004\"\n" + " }\n" + "}"; - + System.out.println(payload); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));