Files
stream-applications/docs/FunctionComposition.adoc
Soby Chacko 4a5bb50659 README changes
* Adding steps for PR contributions in the README
2020-07-13 13:03:58 -04:00

78 lines
3.8 KiB
Plaintext
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
= Function Composition
Spring Cloud Stream includes integration with Spring Cloud Function's function-based programming model that lets the
business logic of an application be modeled as a `java.util.Supplier`, a `java.util.Function`, and a `java.util.Consumer`,
representing the roles of a `Source`, a `Processor`, and a `Sink`, respectively.
Building on this foundation, we can extend existing `Source` and `Sink` applications by importing the configuration of an
existing `Source` or `Sink` and adding code that defines a `java.util.Function` — this delivers a lot of powerful composition
possibilities.
Take for instance, the `Source` applications are auto-configured with link:../functions/function[functions], which may optionally
be included in a composite function definition.
With this, the same Source application can potentially do one or all of the following without having to build it out as a
standalone processor.
- execute SpEL transformations
- enrich message headers
- filter events
- produce task launch requests on upstream events
For example, the `time` source when it is running, as shown below, will perform a series of internal transformations to
finally publish a task launch request every second to the rabbit exchange with the name `time-test`.
```
java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar \
--spring.cloud.stream.bindings.output.destination=time-test \
--spring.cloud.stream.function.definition="timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction" \
--spel.function.expression="payload.length()" \
--header.enricher.headers=task-id=payload*2 \
--task.launch.request.task-name-expression="'task-'+headers['task-id']"
```
Now, the transformed message would look like:
```
headers:
task-id: 34
content_type: application/json
Payload
49 bytes
Encoding: string
{"args":[],"deploymentProps":{},"name":"task-34"}
```
Let us unpack what is happening behind the scenes. We will start with the following function definition.
`timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction`
- Here, the function definition creates a composite `Supplier` beginning with the default `timeSupplier` Java function included
in this repository, which is the foundation for `time-source`.
- The `spelFunction` applies to a `Message` from which we can extract and transform the `payload` or `headers`, by following
standard Spring Integration conventions.
- The output of `spelFunction` is the length of the date-time String, `17`.
- From here, we apply the header-enricher Java function to add a Message header, `task-id` with the value of `payload*2`.
That would be `34`.
- We use the `task-id` in the header to generate the "task name", and to programmatically derive the task launch request
using the SpEL expression "'task-'+headers['task-id']", or `task-34`.
This somewhat contrived example, but the goal here was to highlight the power of function composition.
If you have had a task definition in Spring Cloud Data Flow with the name `task-34`, you could build a `time | tasklauncher`
streaming data pipeline to launch that task every second.
Before the `3.0` release of Stream Applications, this composition required extensive customization. And a lot more manual
configuration changes, extensions, and custom build of the applications.
NOTE: Support for composite functions includes auto-configuration for conventional binding name mappings (`input` and `output`)
derived from the function definition and the presence of `spring.cloud.stream.bindings.output...`.
In this example, `--spring.cloud.stream.bindings.output.destination=time-test` is enabled behind the scenes by the auto-configured
property
`--spring.cloud.stream.function.bindings.timeSupplierspelFunctionheaderEnricherFunctiontaskLaunchRequestFunction-out-0=output`.