102 lines
4.8 KiB
Plaintext
102 lines
4.8 KiB
Plaintext
[[spring-integration]]
|
|
= Spring Integration Interaction
|
|
|
|
https://spring.io/projects/spring-integration[Spring Integration Framework] extends the Spring programming model to support the well-known Enterprise Integration Patterns.
|
|
It enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.
|
|
It also provides a high-level DSL to compose various operations (endpoints) into a logical integration flow.
|
|
With a lambda style of this DSL configuration, Spring Integration already has a good level of `java.util.function` interfaces adoption.
|
|
The `@MessagingGateway` proxy interface can also be as a `Function` or `Consumer`, which according to the Spring Cloud Function environment can be registered into a function catalog.
|
|
See more information in Spring Integration https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#functions-support[ReferenceManual] about its support for functions.
|
|
|
|
On the other hand, starting with version `4.0.3`, Spring Cloud Function introduces a `spring-cloud-function-integration` module which provides deeper, more cloud-specific and auto-configuration based API for interaction with a `FunctionCatalog` from Spring Integration DSL perspective.
|
|
The `FunctionFlowBuilder` is auto-configured and autowired with a `FunctionCatalog` and represents an entry point for function-specific DSL for target `IntegrationFlow` instance.
|
|
In addition to standard `IntegrationFlow.from()` factories (for convenience), the `FunctionFlowBuilder` exposes a `fromSupplier(String supplierDefinition)` factory to lookup the target `Supplier` in the provided `FunctionCatalog`.
|
|
Then this `FunctionFlowBuilder` leads to the `FunctionFlowDefinition`.
|
|
This `FunctionFlowDefinition` is an implementation of the `IntegrationFlowExtension` and exposes `apply(String functionDefinition)` and `accept(String consumerDefinition)` operators to lookup `Function` or `Consumer` from the `FunctionCatalog`, respectively.
|
|
See their Javadocs for more information.
|
|
|
|
The following example demonstrates the `FunctionFlowBuilder` in action alongside with the power of the rest of `IntegrationFlow` API:
|
|
|
|
[source,java]
|
|
----
|
|
@Configuration
|
|
public class IntegrationConfiguration {
|
|
|
|
@Bean
|
|
Supplier<byte[]> simpleByteArraySupplier() {
|
|
return "simple test data"::getBytes;
|
|
}
|
|
|
|
@Bean
|
|
Function<String, String> upperCaseFunction() {
|
|
return String::toUpperCase;
|
|
}
|
|
|
|
@Bean
|
|
BlockingQueue<String> results() {
|
|
return new LinkedBlockingQueue<>();
|
|
}
|
|
|
|
@Bean
|
|
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
|
|
return results::add;
|
|
}
|
|
|
|
@Bean
|
|
QueueChannel wireTapChannel() {
|
|
return new QueueChannel();
|
|
}
|
|
|
|
@Bean
|
|
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
|
|
return functionFlowBuilder
|
|
.fromSupplier("simpleByteArraySupplier")
|
|
.wireTap("wireTapChannel")
|
|
.apply("upperCaseFunction")
|
|
.log(LoggingHandler.Level.WARN)
|
|
.accept("simpleStringConsumer");
|
|
}
|
|
|
|
}
|
|
----
|
|
|
|
Since the `FunctionCatalog.lookup()` functionality is not limited just to simple function names, a function composition feature can also be used in the mentioned `apply()` and `accept()` operators:
|
|
|
|
[source,java]
|
|
----
|
|
@Bean
|
|
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
|
|
return functionFlowBuilder
|
|
.from("functionCompositionInput")
|
|
.accept("upperCaseFunction|simpleStringConsumer");
|
|
}
|
|
----
|
|
|
|
This API becomes more relevant, when we add into our Spring Cloud applications auto-configuration dependencies for predefined functions.
|
|
For example https://spring.io/projects/spring-cloud-stream-applications[Stream Applications] project, in addition to application images, provides artifacts with functions for various integration use-case, e.g. `debezium-supplier`, `elasticsearch-consumer`, `aggregator-function` etc.
|
|
|
|
The following configuration is based on the `http-supplier`, `spel-function` and `file-consumer`, respectively:
|
|
|
|
[source,java]
|
|
----
|
|
@Bean
|
|
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
|
|
return functionFlowBuilder
|
|
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
|
|
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
|
|
.channel(c -> c.flux())
|
|
.apply("spelFunction")
|
|
.<String, String>transform(String::toUpperCase)
|
|
.accept("fileConsumer");
|
|
}
|
|
----
|
|
|
|
What we would need else is just to add their configuration into an `application.properties` (if necessary):
|
|
|
|
[source,properties]
|
|
----
|
|
http.path-pattern=/testPath
|
|
spel.function.expression=new String(payload)
|
|
file.consumer.name=test-data.txt
|
|
----
|