Add Documentation for @StreamListener Dispatching
Fixes #820 Addressing PR comments
This commit is contained in:
committed by
Gary Russell
parent
086895f6f8
commit
02fdd7c9dd
@@ -455,7 +455,6 @@ As with other Spring Messaging methods, method arguments can be annotated with `
|
||||
[NOTE]
|
||||
====
|
||||
For methods which return data, you must use the `@SendTo` annotation to specify the output binding destination for data returned by the method:
|
||||
====
|
||||
|
||||
[source,java]
|
||||
----
|
||||
@@ -474,6 +473,44 @@ public class TransformProcessor {
|
||||
----
|
||||
====
|
||||
|
||||
===== Using @StreamListener for dispatching messages to multiple methods
|
||||
|
||||
Since version 1.2, Spring Cloud Stream supports dispatching messages to multiple `@StreamListener` methods registered on an input channel, based on a condition.
|
||||
|
||||
In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions:
|
||||
|
||||
* it must not return a value
|
||||
* it must be an individual message handling method (reactive API methods are not supported)
|
||||
|
||||
The condition is specified via a SpEL expression in the `condition` attribute of the annotation and is evaluated for each message.
|
||||
All the handlers that match the condition will be invoked in the same thread and no assumption must be made about the order in which the invocations take place.
|
||||
|
||||
An example of using `@StreamListener` with dispatching conditions can be seen below.
|
||||
In this example, all the messages bearing a header `type` with the value `foo` will be dispatched to the `receiveFoo` method, and all the messages bearing a header `type` with the value `bar` will be dispatched to the `receiveBar` method.
|
||||
|
||||
[source,java]
|
||||
----
|
||||
@EnableBinding(Sink.class)
|
||||
@EnableAutoConfiguration
|
||||
public static class TestPojoWithAnnotatedArguments {
|
||||
|
||||
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'")
|
||||
public void receiveFoo(@Payload FooPojo fooPojo) {
|
||||
// handle the message
|
||||
}
|
||||
|
||||
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'")
|
||||
public void receiveBar(@Payload BarPojo barPojo) {
|
||||
// handle the message
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
Dispatching via `@StreamListener` conditions is only supported for handlers of individual messages, and not for reactive programming support (described below).
|
||||
====
|
||||
|
||||
==== Reactive Programming Support
|
||||
|
||||
Spring Cloud Stream also supports the use of reactive APIs where incoming and outgoing data is handled as continuous data flows.
|
||||
|
||||
144
spring-cloud-stream-reactive/dependency-reduced-pom.xml
Normal file
144
spring-cloud-stream-reactive/dependency-reduced-pom.xml
Normal file
@@ -0,0 +1,144 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<parent>
|
||||
<artifactId>spring-cloud-stream-parent</artifactId>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<version>1.2.0.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-reactive</artifactId>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>2.4.3</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>io.reactivex:rxjava-reactive-streams</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
<relocations>
|
||||
<relocation>
|
||||
<pattern>rx.RxReactiveStreams</pattern>
|
||||
<shadedPattern>org.springframework.cloud.stream.reactive.shaded.rx.RxReactiveStreams</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>rx.internal</pattern>
|
||||
<shadedPattern>org.springframework.cloud.stream.reactive.shaded.rx.internal</shadedPattern>
|
||||
</relocation>
|
||||
</relocations>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
<version>1.2.0.BUILD-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
<version>3.0.4.RELEASE</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.reactivex</groupId>
|
||||
<artifactId>rxjava</artifactId>
|
||||
<version>1.1.10</version>
|
||||
<scope>compile</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.reactivex</groupId>
|
||||
<artifactId>rxjava-reactive-streams</artifactId>
|
||||
<version>1.2.1</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<version>1.5.1.BUILD-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>spring-boot-test</artifactId>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>spring-boot-test-autoconfigure</artifactId>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>json-path</artifactId>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>junit</artifactId>
|
||||
<groupId>junit</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<groupId>org.assertj</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<groupId>org.mockito</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>hamcrest-library</artifactId>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>jsonassert</artifactId>
|
||||
<groupId>org.skyscreamer</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>spring-test</artifactId>
|
||||
<groupId>org.springframework</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-test-support</artifactId>
|
||||
<version>1.2.0.BUILD-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
|
||||
<version>1.2.0.BUILD-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>junit</artifactId>
|
||||
<groupId>junit</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
</project>
|
||||
|
||||
Reference in New Issue
Block a user