From 02fdd7c9ddfcbd3420d2e55f89b47cb3ee15ac3a Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Tue, 21 Feb 2017 10:53:24 -0500 Subject: [PATCH] Add Documentation for @StreamListener Dispatching Fixes #820 Addressing PR comments --- .../spring-cloud-stream-overview.adoc | 39 ++++- .../dependency-reduced-pom.xml | 144 ++++++++++++++++++ 2 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 spring-cloud-stream-reactive/dependency-reduced-pom.xml diff --git a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc index 4b8eaf845..d56b5831d 100644 --- a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc +++ b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc @@ -455,7 +455,6 @@ As with other Spring Messaging methods, method arguments can be annotated with ` [NOTE] ==== For methods which return data, you must use the `@SendTo` annotation to specify the output binding destination for data returned by the method: -==== [source,java] ---- @@ -474,6 +473,44 @@ public class TransformProcessor { ---- ==== +===== Using @StreamListener for dispatching messages to multiple methods + +Since version 1.2, Spring Cloud Stream supports dispatching messages to multiple `@StreamListener` methods registered on an input channel, based on a condition. + +In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions: + +* it must not return a value +* it must be an individual message handling method (reactive API methods are not supported) + +The condition is specified via a SpEL expression in the `condition` attribute of the annotation and is evaluated for each message. +All the handlers that match the condition will be invoked in the same thread and no assumption must be made about the order in which the invocations take place. + +An example of using `@StreamListener` with dispatching conditions can be seen below. +In this example, all the messages bearing a header `type` with the value `foo` will be dispatched to the `receiveFoo` method, and all the messages bearing a header `type` with the value `bar` will be dispatched to the `receiveBar` method. + +[source,java] +---- +@EnableBinding(Sink.class) +@EnableAutoConfiguration +public static class TestPojoWithAnnotatedArguments { + + @StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'") + public void receiveFoo(@Payload FooPojo fooPojo) { + // handle the message + } + + @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'") + public void receiveBar(@Payload BarPojo barPojo) { + // handle the message + } +} +---- + +[NOTE] +==== +Dispatching via `@StreamListener` conditions is only supported for handlers of individual messages, and not for reactive programming support (described below). +==== + ==== Reactive Programming Support Spring Cloud Stream also supports the use of reactive APIs where incoming and outgoing data is handled as continuous data flows. diff --git a/spring-cloud-stream-reactive/dependency-reduced-pom.xml b/spring-cloud-stream-reactive/dependency-reduced-pom.xml new file mode 100644 index 000000000..c6b5551ee --- /dev/null +++ b/spring-cloud-stream-reactive/dependency-reduced-pom.xml @@ -0,0 +1,144 @@ + + + + spring-cloud-stream-parent + org.springframework.cloud + 1.2.0.BUILD-SNAPSHOT + + 4.0.0 + spring-cloud-stream-reactive + + + + maven-shade-plugin + 2.4.3 + + + package + + shade + + + + + io.reactivex:rxjava-reactive-streams + + + + + rx.RxReactiveStreams + org.springframework.cloud.stream.reactive.shaded.rx.RxReactiveStreams + + + rx.internal + org.springframework.cloud.stream.reactive.shaded.rx.internal + + + + + + + + + + + org.springframework.cloud + spring-cloud-stream + 1.2.0.BUILD-SNAPSHOT + compile + + + io.projectreactor + reactor-core + 3.0.4.RELEASE + compile + + + io.reactivex + rxjava + 1.1.10 + compile + true + + + io.reactivex + rxjava-reactive-streams + 1.2.1 + provided + + + org.springframework.boot + spring-boot-starter-test + 1.5.1.BUILD-SNAPSHOT + test + + + commons-logging + commons-logging + + + spring-boot-test + org.springframework.boot + + + spring-boot-test-autoconfigure + org.springframework.boot + + + json-path + com.jayway.jsonpath + + + junit + junit + + + assertj-core + org.assertj + + + mockito-core + org.mockito + + + hamcrest-core + org.hamcrest + + + hamcrest-library + org.hamcrest + + + jsonassert + org.skyscreamer + + + spring-test + org.springframework + + + + + org.springframework.cloud + spring-cloud-stream-test-support + 1.2.0.BUILD-SNAPSHOT + test + + + org.springframework.cloud + spring-cloud-stream-test-support-internal + 1.2.0.BUILD-SNAPSHOT + test + + + junit + junit + + + + + + 1.8 + + +