Files
stream-applications/docs/FunctionComposition.adoc
2023-04-11 16:06:46 +02:00

80 lines
3.8 KiB
Plaintext
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
= Function Composition
Spring Cloud Stream includes integration with Spring Cloud Function's function-based programming model that lets the
business logic of an application be modeled as a `java.util.Supplier`, a `java.util.Function`, and a `java.util.Consumer`,
representing the roles of a `Source`, a `Processor`, and a `Sink`, respectively.
Building on this foundation, we can extend existing `Source` and `Sink` applications by importing the configuration of an
existing `Source` or `Sink` and adding code that defines a `java.util.Function` — this delivers a lot of powerful composition
possibilities.
Take for instance, the `Source` applications are auto-configured with link:../functions/function[functions], which may optionally
be included in a composite function definition.
With this, the same Source application can potentially do one or all of the following without having to build it out as a
standalone processor.
- execute SpEL transformations
- enrich message headers
- filter events
- produce task launch requests on upstream events
For example, the `time` source when it is running, as shown below, will perform a series of internal transformations to
finally publish a task launch request every second to the rabbit exchange with the name `time-test`.
[source,shell]
----
java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar \
--spring.cloud.stream.bindings.output.destination=time-test \
--spring.cloud.function.definition="timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction" \
--spel.function.expression="payload.length()" \
--header.enricher.headers=task-id=payload*2 \
--task.launch.request.task-name-expression="'task-'+headers['task-id']"
----
Now, the transformed message would look like:
[source,text]
----
headers:
task-id: 34
content_type: application/json
Payload
49 bytes
Encoding: string
{"args":[],"deploymentProps":{},"name":"task-34"}
----
Let us unpack what is happening behind the scenes. We will start with the following function definition.
`timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction`
- Here, the function definition creates a composite `Supplier` beginning with the default `timeSupplier` Java function included
in this repository, which is the foundation for `time-source`.
- The `spelFunction` applies to a `Message` from which we can extract and transform the `payload` or `headers`, by following
standard Spring Integration conventions.
- The output of `spelFunction` is the length of the date-time String, `17`.
- From here, we apply the header-enricher Java function to add a Message header, `task-id` with the value of `payload*2`.
That would be `34`.
- We use the `task-id` in the header to generate the "task name", and to programmatically derive the task launch request
using the SpEL expression "'task-'+headers['task-id']", or `task-34`.
This somewhat contrived example, but the goal here was to highlight the power of function composition.
If you have had a task definition in Spring Cloud Data Flow with the name `task-34`, you could build a `time | tasklauncher`
streaming data pipeline to launch that task every second.
Before the `3.0` release of Stream Applications, this composition required extensive customization. And a lot more manual
configuration changes, extensions, and custom build of the applications.
NOTE: Support for composite functions includes auto-configuration for conventional binding name mappings (`input` and `output`)
derived from the function definition and the presence of `spring.cloud.stream.bindings.output...`.
In this example, `--spring.cloud.stream.bindings.output.destination=time-test` is enabled behind the scenes by the auto-configured
property
`--spring.cloud.stream.function.bindings.timeSupplierspelFunctionheaderEnricherFunctiontaskLaunchRequestFunction-out-0=output`.