78 lines
3.8 KiB
Plaintext
78 lines
3.8 KiB
Plaintext
= Function Composition
|
||
|
||
Spring Cloud Stream includes integration with Spring Cloud Function's function-based programming model that lets the
|
||
business logic of an application be modeled as a `java.util.Supplier`, a `java.util.Function`, and a `java.util.Consumer`,
|
||
representing the roles of a `Source`, a `Processor`, and a `Sink`, respectively.
|
||
|
||
Building on this foundation, we can extend existing `Source` and `Sink` applications by importing the configuration of an
|
||
existing `Source` or `Sink` and adding code that defines a `java.util.Function` — this delivers a lot of powerful composition
|
||
possibilities.
|
||
|
||
Take for instance, the `Source` applications are auto-configured with link:../functions/function[functions], which may optionally
|
||
be included in a composite function definition.
|
||
|
||
With this, the same Source application can potentially do one or all of the following without having to build it out as a
|
||
standalone processor.
|
||
|
||
- execute SpEL transformations
|
||
- enrich message headers
|
||
- filter events
|
||
- produce task launch requests on upstream events
|
||
|
||
For example, the `time` source when it is running, as shown below, will perform a series of internal transformations to
|
||
finally publish a task launch request every second to the rabbit exchange with the name `time-test`.
|
||
|
||
```
|
||
java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar \
|
||
--spring.cloud.stream.bindings.output.destination=time-test \
|
||
--spring.cloud.stream.function.definition="timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction" \
|
||
--spel.function.expression="payload.length()" \
|
||
--header.enricher.headers=task-id=payload*2 \
|
||
--task.launch.request.task-name-expression="'task-'+headers['task-id']"
|
||
```
|
||
|
||
Now, the transformed message would look like:
|
||
|
||
```
|
||
headers:
|
||
task-id: 34
|
||
content_type: application/json
|
||
Payload
|
||
49 bytes
|
||
Encoding: string
|
||
{"args":[],"deploymentProps":{},"name":"task-34"}
|
||
```
|
||
|
||
Let us unpack what is happening behind the scenes. We will start with the following function definition.
|
||
|
||
`timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction`
|
||
|
||
- Here, the function definition creates a composite `Supplier` beginning with the default `timeSupplier` Java function included
|
||
in this repository, which is the foundation for `time-source`.
|
||
|
||
- The `spelFunction` applies to a `Message` from which we can extract and transform the `payload` or `headers`, by following
|
||
standard Spring Integration conventions.
|
||
|
||
- The output of `spelFunction` is the length of the date-time String, `17`.
|
||
|
||
- From here, we apply the header-enricher Java function to add a Message header, `task-id` with the value of `payload*2`.
|
||
That would be `34`.
|
||
|
||
- We use the `task-id` in the header to generate the "task name", and to programmatically derive the task launch request
|
||
using the SpEL expression "'task-'+headers['task-id']", or `task-34`.
|
||
|
||
This somewhat contrived example, but the goal here was to highlight the power of function composition.
|
||
|
||
If you have had a task definition in Spring Cloud Data Flow with the name `task-34`, you could build a `time | tasklauncher`
|
||
streaming data pipeline to launch that task every second.
|
||
|
||
Before the `3.0` release of Stream Applications, this composition required extensive customization. And a lot more manual
|
||
configuration changes, extensions, and custom build of the applications.
|
||
|
||
NOTE: Support for composite functions includes auto-configuration for conventional binding name mappings (`input` and `output`)
|
||
derived from the function definition and the presence of `spring.cloud.stream.bindings.output...`.
|
||
|
||
In this example, `--spring.cloud.stream.bindings.output.destination=time-test` is enabled behind the scenes by the auto-configured
|
||
property
|
||
`--spring.cloud.stream.function.bindings.timeSupplierspelFunctionheaderEnricherFunctiontaskLaunchRequestFunction-out-0=output`.
|