Files
spring-cloud-static/Dalston.SR5/multi/multi__programming_model.html
2017-12-22 20:14:47 -05:00

272 lines
50 KiB
HTML

<html><head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>25.&nbsp;Programming Model</title><link rel="stylesheet" type="text/css" href="css/manual-multipage.css"><meta name="generator" content="DocBook XSL Stylesheets V1.78.1"><link rel="home" href="multi_spring-cloud.html" title="Spring Cloud"><link rel="up" href="multi__spring_cloud_stream.html" title="Part&nbsp;IV.&nbsp;Spring Cloud Stream"><link rel="prev" href="multi__main_concepts.html" title="24.&nbsp;Main Concepts"><link rel="next" href="multi__binders.html" title="26.&nbsp;Binders"></head><body bgcolor="white" text="black" link="#0000FF" vlink="#840084" alink="#0000FF"><div class="navheader"><table width="100%" summary="Navigation header"><tr><th colspan="3" align="center">25.&nbsp;Programming Model</th></tr><tr><td width="20%" align="left"><a accesskey="p" href="multi__main_concepts.html">Prev</a>&nbsp;</td><th width="60%" align="center">Part&nbsp;IV.&nbsp;Spring Cloud Stream</th><td width="20%" align="right">&nbsp;<a accesskey="n" href="multi__binders.html">Next</a></td></tr></table><hr></div><div class="chapter"><div class="titlepage"><div><div><h2 class="title"><a name="_programming_model" href="#_programming_model"></a>25.&nbsp;Programming Model</h2></div></div></div><p>This section describes Spring Cloud Stream&#8217;s programming model.
Spring Cloud Stream provides a number of predefined annotations for declaring bound input and output channels as well as how to listen to channels.</p><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_declaring_and_binding_channels" href="#_declaring_and_binding_channels"></a>25.1&nbsp;Declaring and Binding Channels</h2></div></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_triggering_binding_via_literal_enablebinding_literal" href="#_triggering_binding_via_literal_enablebinding_literal"></a>25.1.1&nbsp;Triggering Binding Via <code class="literal">@EnableBinding</code></h3></div></div></div><p>You can turn a Spring application into a Spring Cloud Stream application by applying the <code class="literal">@EnableBinding</code> annotation to one of the application&#8217;s configuration classes.
The <code class="literal">@EnableBinding</code> annotation itself is meta-annotated with <code class="literal">@Configuration</code> and triggers the configuration of Spring Cloud Stream infrastructure:</p><pre class="programlisting">...
<em><span class="hl-annotation" style="color: gray">@Import(...)</span></em>
<em><span class="hl-annotation" style="color: gray">@Configuration</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableIntegration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <em><span class="hl-annotation" style="color: gray">@interface</span></em> EnableBinding {
...
Class&lt;?&gt;[] value() <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">default</span> {};
}</pre><p>The <code class="literal">@EnableBinding</code> annotation can take as parameters one or more interface classes that contain methods which represent bindable components (typically message channels).</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>In Spring Cloud Stream 1.0, the only supported bindable components are the Spring Messaging <code class="literal">MessageChannel</code> and its extensions <code class="literal">SubscribableChannel</code> and <code class="literal">PollableChannel</code>.
Future versions should extend this support to other types of components, using the same mechanism.
In this documentation, we will continue to refer to channels.</p></td></tr></table></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="__literal_input_literal_and_literal_output_literal" href="#__literal_input_literal_and_literal_output_literal"></a>25.1.2&nbsp;<code class="literal">@Input</code> and <code class="literal">@Output</code></h3></div></div></div><p>A Spring Cloud Stream application can have an arbitrary number of input and output channels defined in an interface as <code class="literal">@Input</code> and <code class="literal">@Output</code> methods:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Barista {
<em><span class="hl-annotation" style="color: gray">@Input</span></em>
SubscribableChannel orders();
<em><span class="hl-annotation" style="color: gray">@Output</span></em>
MessageChannel hotDrinks();
<em><span class="hl-annotation" style="color: gray">@Output</span></em>
MessageChannel coldDrinks();
}</pre><p>Using this interface as a parameter to <code class="literal">@EnableBinding</code> will trigger the creation of three bound channels named <code class="literal">orders</code>, <code class="literal">hotDrinks</code>, and <code class="literal">coldDrinks</code>, respectively.</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Barista.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> CafeConfiguration {
...
}</pre><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_customizing_channel_names" href="#_customizing_channel_names"></a>Customizing Channel Names</h4></div></div></div><p>Using the <code class="literal">@Input</code> and <code class="literal">@Output</code> annotations, you can specify a customized channel name for the channel, as shown in the following example:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Barista {
...
<em><span class="hl-annotation" style="color: gray">@Input("inboundOrders")</span></em>
SubscribableChannel orders();
}</pre><p>In this example, the created bound channel will be named <code class="literal">inboundOrders</code>.</p></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="__literal_source_literal_literal_sink_literal_and_literal_processor_literal" href="#__literal_source_literal_literal_sink_literal_and_literal_processor_literal"></a><code class="literal">Source</code>, <code class="literal">Sink</code>, and <code class="literal">Processor</code></h4></div></div></div><p>For easy addressing of the most common use cases, which involve either an input channel, an output channel, or both, Spring Cloud Stream provides three predefined interfaces out of the box.</p><p><code class="literal">Source</code> can be used for an application which has a single outbound channel.</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Source {
String OUTPUT = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"output"</span>;
<em><span class="hl-annotation" style="color: gray">@Output(Source.OUTPUT)</span></em>
MessageChannel output();
}</pre><p><code class="literal">Sink</code> can be used for an application which has a single inbound channel.</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Sink {
String INPUT = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"input"</span>;
<em><span class="hl-annotation" style="color: gray">@Input(Sink.INPUT)</span></em>
SubscribableChannel input();
}</pre><p><code class="literal">Processor</code> can be used for an application which has both an inbound channel and an outbound channel.</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Processor <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">extends</span> Source, Sink {
}</pre><p>Spring Cloud Stream provides no special handling for any of these interfaces; they are only provided out of the box.</p></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_accessing_bound_channels" href="#_accessing_bound_channels"></a>25.1.3&nbsp;Accessing Bound Channels</h3></div></div></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_injecting_the_bound_interfaces" href="#_injecting_the_bound_interfaces"></a>Injecting the Bound Interfaces</h4></div></div></div><p>For each bound interface, Spring Cloud Stream will generate a bean that implements the interface.
Invoking a <code class="literal">@Input</code>-annotated or <code class="literal">@Output</code>-annotated method of one of these beans will return the relevant bound channel.</p><p>The bean in the following example sends a message on the output channel when its <code class="literal">hello</code> method is invoked.
It invokes <code class="literal">output()</code> on the injected <code class="literal">Source</code> bean to retrieve the target channel.</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@Component</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> SendingBean {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> Source source;
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> SendingBean(Source source) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.source = source;
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> sayHello(String name) {
source.output().send(MessageBuilder.withPayload(name).build());
}
}</pre></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_injecting_channels_directly" href="#_injecting_channels_directly"></a>Injecting Channels Directly</h4></div></div></div><p>Bound channels can be also injected directly:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@Component</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> SendingBean {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> MessageChannel output;
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> SendingBean(MessageChannel output) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.output = output;
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> sayHello(String name) {
output.send(MessageBuilder.withPayload(name).build());
}
}</pre><p>If the name of the channel is customized on the declaring annotation, that name should be used instead of the method name.
Given the following declaration:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> CustomSource {
...
<em><span class="hl-annotation" style="color: gray">@Output("customOutput")</span></em>
MessageChannel output();
}</pre><p>The channel will be injected as shown in the following example:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@Component</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> SendingBean {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> MessageChannel output;
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> SendingBean(<em><span class="hl-annotation" style="color: gray">@Qualifier("customOutput")</span></em> MessageChannel output) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.output = output;
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> sayHello(String name) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.output.send(MessageBuilder.withPayload(name).build());
}
}</pre></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_producing_and_consuming_messages" href="#_producing_and_consuming_messages"></a>25.1.4&nbsp;Producing and Consuming Messages</h3></div></div></div><p>You can write a Spring Cloud Stream application using either Spring Integration annotations or Spring Cloud Stream&#8217;s <code class="literal">@StreamListener</code> annotation.
The <code class="literal">@StreamListener</code> annotation is modeled after other Spring Messaging annotations (such as <code class="literal">@MessageMapping</code>, <code class="literal">@JmsListener</code>, <code class="literal">@RabbitListener</code>, etc.) but adds content type management and type coercion features.</p><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_native_spring_integration_support" href="#_native_spring_integration_support"></a>Native Spring Integration Support</h4></div></div></div><p>Because Spring Cloud Stream is based on Spring Integration, Stream completely inherits Integration&#8217;s foundation and infrastructure as well as the component itself.
For example, you can attach the output channel of a <code class="literal">Source</code> to a <code class="literal">MessageSource</code>:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> TimerSource {
<em><span class="hl-annotation" style="color: gray">@Value("${format}")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> String format;
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<em><span class="hl-annotation" style="color: gray">@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> MessageSource&lt;String&gt; timerMessageSource() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> () -&gt; <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> GenericMessage&lt;&gt;(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> SimpleDateFormat(format).format(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Date()));
}
}</pre><p>Or you can use a processor&#8217;s channels in a transformer:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> TransformProcessor {
<em><span class="hl-annotation" style="color: gray">@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Object transform(String message) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> message.toUpperCase();
}
}</pre></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_spring_integration_error_channel_support" href="#_spring_integration_error_channel_support"></a>Spring Integration Error Channel Support</h4></div></div></div><p>Spring Cloud Stream supports publishing error messages received by the Spring Integration global
error channel. Error messages sent to the <code class="literal">errorChannel</code> can be published to a specific destination
at the broker by configuring a binding for the outbound target named <code class="literal">error</code>. For example, to
publish error messages to a broker destination named "myErrors", provide the following property:
<code class="literal">spring.cloud.stream.bindings.error.destination=myErrors</code></p></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_using_streamlistener_for_automatic_content_type_handling" href="#_using_streamlistener_for_automatic_content_type_handling"></a>Using @StreamListener for Automatic Content Type Handling</h4></div></div></div><p>Complementary to its Spring Integration support, Spring Cloud Stream provides its own <code class="literal">@StreamListener</code> annotation, modeled after other Spring Messaging annotations (e.g. <code class="literal">@MessageMapping</code>, <code class="literal">@JmsListener</code>, <code class="literal">@RabbitListener</code>, etc.).
The <code class="literal">@StreamListener</code> annotation provides a simpler model for handling inbound messages, especially when dealing with use cases that involve content type management and type coercion.</p><p>Spring Cloud Stream provides an extensible <code class="literal">MessageConverter</code> mechanism for handling data conversion by bound channels and for, in this case, dispatching to methods annotated with <code class="literal">@StreamListener</code>.
The following is an example of an application which processes external <code class="literal">Vote</code> events:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> VoteHandler {
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
VotingService votingService;
<em><span class="hl-annotation" style="color: gray">@StreamListener(Sink.INPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> handle(Vote vote) {
votingService.record(vote);
}
}</pre><p>The distinction between <code class="literal">@StreamListener</code> and a Spring Integration <code class="literal">@ServiceActivator</code> is seen when considering an inbound <code class="literal">Message</code> that has a <code class="literal">String</code> payload and a <code class="literal">contentType</code> header of <code class="literal">application/json</code>.
In the case of <code class="literal">@StreamListener</code>, the <code class="literal">MessageConverter</code> mechanism will use the <code class="literal">contentType</code> header to parse the <code class="literal">String</code> payload into a <code class="literal">Vote</code> object.</p><p>As with other Spring Messaging methods, method arguments can be annotated with <code class="literal">@Payload</code>, <code class="literal">@Headers</code> and <code class="literal">@Header</code>.</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>For methods which return data, you must use the <code class="literal">@SendTo</code> annotation to specify the output binding destination for data returned by the method:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> TransformProcessor {
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
VotingService votingService;
<em><span class="hl-annotation" style="color: gray">@StreamListener(Processor.INPUT)</span></em>
<em><span class="hl-annotation" style="color: gray">@SendTo(Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> VoteResult handle(Vote vote) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> votingService.record(vote);
}
}</pre></td></tr></table></div></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_using_streamlistener_for_dispatching_messages_to_multiple_methods" href="#_using_streamlistener_for_dispatching_messages_to_multiple_methods"></a>Using @StreamListener for dispatching messages to multiple methods</h4></div></div></div><p>Since version 1.2, Spring Cloud Stream supports dispatching messages to multiple <code class="literal">@StreamListener</code> methods registered on an input channel, based on a condition.</p><p>In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">it must not return a value</li><li class="listitem">it must be an individual message handling method (reactive API methods are not supported)</li></ul></div><p>The condition is specified via a SpEL expression in the <code class="literal">condition</code> attribute of the annotation and is evaluated for each message.
All the handlers that match the condition will be invoked in the same thread and no assumption must be made about the order in which the invocations take place.</p><p>An example of using <code class="literal">@StreamListener</code> with dispatching conditions can be seen below.
In this example, all the messages bearing a header <code class="literal">type</code> with the value <code class="literal">foo</code> will be dispatched to the <code class="literal">receiveFoo</code> method, and all the messages bearing a header <code class="literal">type</code> with the value <code class="literal">bar</code> will be dispatched to the <code class="literal">receiveBar</code> method.</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> TestPojoWithAnnotatedArguments {
<em><span class="hl-annotation" style="color: gray">@StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> receiveFoo(<em><span class="hl-annotation" style="color: gray">@Payload</span></em> FooPojo fooPojo) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// handle the message</span>
}
<em><span class="hl-annotation" style="color: gray">@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> receiveBar(<em><span class="hl-annotation" style="color: gray">@Payload</span></em> BarPojo barPojo) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// handle the message</span>
}
}</pre><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>Dispatching via <code class="literal">@StreamListener</code> conditions is only supported for handlers of individual messages, and not for reactive programming support (described below).</p></td></tr></table></div></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_reactive_programming_support" href="#_reactive_programming_support"></a>25.1.5&nbsp;Reactive Programming Support</h3></div></div></div><p>Spring Cloud Stream also supports the use of reactive APIs where incoming and outgoing data is handled as continuous data flows.
Support for reactive APIs is available via the <code class="literal">spring-cloud-stream-reactive</code>, which needs to be added explicitly to your project.</p><p>The programming model with reactive APIs is declarative, where instead of specifying how each individual message should be handled, you can use operators that describe functional transformations from inbound to outbound data flows.</p><p>Spring Cloud Stream supports the following reactive APIs:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">Reactor</li><li class="listitem">RxJava 1.x</li></ul></div><p>In the future, it is intended to support a more generic model based on Reactive Streams.</p><p>The reactive programming model is also using the <code class="literal">@StreamListener</code> annotation for setting up reactive handlers. The differences are that:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">the <code class="literal">@StreamListener</code> annotation must not specify an input or output, as they are provided as arguments and return values from the method;</li><li class="listitem">the arguments of the method must be annotated with <code class="literal">@Input</code> and <code class="literal">@Output</code> indicating which input or output will the incoming and respectively outgoing data flows connect to;</li><li class="listitem">the return value of the method, if any, will be annotated with <code class="literal">@Output</code>, indicating the input where data shall be sent.</li></ul></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>Reactive programming support requires Java 1.8.</p></td></tr></table></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>As of Spring Cloud Stream 1.1.1 and later (starting with release train Brooklyn.SR2), reactive programming support requires the use of Reactor 3.0.4.RELEASE and higher.
Earlier Reactor versions (including 3.0.1.RELEASE, 3.0.2.RELEASE and 3.0.3.RELEASE) are not supported.
<code class="literal">spring-cloud-stream-reactive</code> will transitively retrieve the proper version, but it is possible for the project structure to manage the version of the <code class="literal">io.projectreactor:reactor-core</code> to an earlier release, especially when using Maven.
This is the case for projects generated via Spring Initializr with Spring Boot 1.x, which will override the Reactor version to <code class="literal">2.0.8.RELEASE</code>.
In such cases you must ensure that the proper version of the artifact is released.
This can be simply achieved by adding a direct dependency on <code class="literal">io.projectreactor:reactor-core</code> with a version of <code class="literal">3.0.4.RELEASE</code> or later to your project.</p></td></tr></table></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>The use of term <code class="literal">reactive</code> is currently referring to the reactive APIs being used and not to the execution model being reactive (i.e. the bound endpoints are still using a 'push' rather than 'pull' model). While some backpressure support is provided by the use of Reactor, we do intend on the long run to support entirely reactive pipelines by the use of native reactive clients for the connected middleware.</p></td></tr></table></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_reactor_based_handlers" href="#_reactor_based_handlers"></a>Reactor-based handlers</h4></div></div></div><p>A Reactor based handler can have the following argument types:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">For arguments annotated with <code class="literal">@Input</code>, it supports the Reactor type <code class="literal">Flux</code>.
The parameterization of the inbound Flux follows the same rules as in the case of individual message handling: it can be the entire <code class="literal">Message</code>, a POJO which can be the <code class="literal">Message</code> payload, or a POJO which is the result of a transformation based on the <code class="literal">Message</code> content-type header. Multiple inputs are provided;</li><li class="listitem">For arguments annotated with <code class="literal">Output</code>, it supports the type <code class="literal">FluxSender</code> which connects a <code class="literal">Flux</code> produced by the method with an output. Generally speaking, specifying outputs as arguments is only recommended when the method can have multiple outputs;</li></ul></div><p>A Reactor based handler supports a return type of <code class="literal">Flux</code>, case in which it must be annotated with <code class="literal">@Output</code>. We recommend using the return value of the method when a single output flux is available.</p><p>Here is an example of a simple Reactor-based Processor.</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> UppercaseTransformer {
<em><span class="hl-annotation" style="color: gray">@StreamListener</span></em>
<em><span class="hl-annotation" style="color: gray">@Output(Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Flux&lt;String&gt; receive(<em><span class="hl-annotation" style="color: gray">@Input(Processor.INPUT)</span></em> Flux&lt;String&gt; input) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> input.map(s -&gt; s.toUpperCase());
}
}</pre><p>The same processor using output arguments looks like this:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> UppercaseTransformer {
<em><span class="hl-annotation" style="color: gray">@StreamListener</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> receive(<em><span class="hl-annotation" style="color: gray">@Input(Processor.INPUT)</span></em> Flux&lt;String&gt; input,
<em><span class="hl-annotation" style="color: gray">@Output(Processor.OUTPUT)</span></em> FluxSender output) {
output.send(input.map(s -&gt; s.toUpperCase()));
}
}</pre></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_rxjava_1_x_support" href="#_rxjava_1_x_support"></a>RxJava 1.x support</h4></div></div></div><p>RxJava 1.x handlers follow the same rules as Reactor-based one, but will use <code class="literal">Observable</code> and <code class="literal">ObservableSender</code> arguments and return types.</p><p>So the first example above will become:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> UppercaseTransformer {
<em><span class="hl-annotation" style="color: gray">@StreamListener</span></em>
<em><span class="hl-annotation" style="color: gray">@Output(Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Observable&lt;String&gt; receive(<em><span class="hl-annotation" style="color: gray">@Input(Processor.INPUT)</span></em> Observable&lt;String&gt; input) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> input.map(s -&gt; s.toUpperCase());
}
}</pre><p>The second example above will become:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> UppercaseTransformer {
<em><span class="hl-annotation" style="color: gray">@StreamListener</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> receive(<em><span class="hl-annotation" style="color: gray">@Input(Processor.INPUT)</span></em> Observable&lt;String&gt; input,
<em><span class="hl-annotation" style="color: gray">@Output(Processor.OUTPUT)</span></em> ObservableSender output) {
output.send(input.map(s -&gt; s.toUpperCase()));
}
}</pre></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_aggregation" href="#_aggregation"></a>25.1.6&nbsp;Aggregation</h3></div></div></div><p>Spring Cloud Stream provides support for aggregating multiple applications together, connecting their input and output channels directly and avoiding the additional cost of exchanging messages via a broker.
As of version 1.0 of Spring Cloud Stream, aggregation is supported only for the following types of applications:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><span class="emphasis"><em>sources</em></span> - applications with a single output channel named <code class="literal">output</code>, typically having a single binding of the type <code class="literal">org.springframework.cloud.stream.messaging.Source</code></li><li class="listitem"><span class="emphasis"><em>sinks</em></span> - applications with a single input channel named <code class="literal">input</code>, typically having a single binding of the type <code class="literal">org.springframework.cloud.stream.messaging.Sink</code></li><li class="listitem"><span class="emphasis"><em>processors</em></span> - applications with a single input channel named <code class="literal">input</code> and a single output channel named <code class="literal">output</code>, typically having a single binding of the type <code class="literal">org.springframework.cloud.stream.messaging.Processor</code>.</li></ul></div><p>They can be aggregated together by creating a sequence of interconnected applications, in which the output channel of an element in the sequence is connected to the input channel of the next element, if it exists.
A sequence can start with either a <span class="emphasis"><em>source</em></span> or a <span class="emphasis"><em>processor</em></span>, it can contain an arbitrary number of <span class="emphasis"><em>processors</em></span> and must end with either a <span class="emphasis"><em>processor</em></span> or a <span class="emphasis"><em>sink</em></span>.</p><p>Depending on the nature of the starting and ending element, the sequence may have one or more bindable channels, as follows:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">if the sequence starts with a source and ends with a sink, all communication between the applications is direct and no channels will be bound</li><li class="listitem">if the sequence starts with a processor, then its input channel will become the <code class="literal">input</code> channel of the aggregate and will be bound accordingly</li><li class="listitem">if the sequence ends with a processor, then its output channel will become the <code class="literal">output</code> channel of the aggregate and will be bound accordingly</li></ul></div><p>Aggregation is performed using the <code class="literal">AggregateApplicationBuilder</code> utility class, as in the following example.
Let&#8217;s consider a project in which we have source, processor and a sink, which may be defined in the project, or may be contained in one of the project&#8217;s dependencies.</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>Each component (source, sink or processor) in an aggregate application must be provided in a separate package if the configuration classes use <code class="literal">@SpringBootApplication</code>.
This is required to avoid cross-talk between applications, due to the classpath scanning performed by <code class="literal">@SpringBootApplication</code> on the configuration classes inside the same package.
In the example below, it can be seen that the Source, Processor and Sink application classes are grouped in separate packages.
A possible alternative is to provide the source, sink or processor configuration in a separate <code class="literal">@Configuration</code> class, avoid the use of <code class="literal">@SpringBootApplication</code>/<code class="literal">@ComponentScan</code> and use those for aggregation.</p></td></tr></table></div><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">package</span> com.app.mysink;
<em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> SinkApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> Logger logger = LoggerFactory.getLogger(SinkApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>);
<em><span class="hl-annotation" style="color: gray">@ServiceActivator(inputChannel=Sink.INPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> loggerSink(Object payload) {
logger.info(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Received: "</span> + payload);
}
}</pre><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">package</span> com.app.myprocessor;
<em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> ProcessorApplication {
<em><span class="hl-annotation" style="color: gray">@Transformer</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> String loggerSink(String payload) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> payload.toUpperCase();
}
}</pre><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">package</span> com.app.mysource;
<em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> SourceApplication {
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<em><span class="hl-annotation" style="color: gray">@InboundChannelAdapter(value = Source.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> String timerMessageSource() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> SimpleDateFormat().format(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Date());
}
}</pre><p>Each configuration can be used for running a separate component, but in this case they can be aggregated together as follows:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">package</span> com.app;
<em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> SampleAggregateApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> AggregateApplicationBuilder()
.from(SourceApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>).args(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"--fixedDelay=5000"</span>)
.via(ProcessorApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>)
.to(SinkApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>).args(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"--debug=true"</span>).run(args);
}
}</pre><p>The starting component of the sequence is provided as argument to the <code class="literal">from()</code> method.
The ending component of the sequence is provided as argument to the <code class="literal">to()</code> method.
Intermediate processors are provided as argument to the <code class="literal">via()</code> method.
Multiple processors of the same type can be chained together (e.g. for pipelining transformations with different configurations).
For each component, the builder can provide runtime arguments for Spring Boot configuration.</p><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_configuring_aggregate_application" href="#_configuring_aggregate_application"></a>Configuring aggregate application</h4></div></div></div><p>Spring Cloud Stream supports passing properties for the individual applications inside the aggregate application using 'namespace' as prefix.</p><p>The namespace can be set for applications as follows:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> SampleAggregateApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> AggregateApplicationBuilder()
.from(SourceApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>).namespace(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"source"</span>).args(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"--fixedDelay=5000"</span>)
.via(ProcessorApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>).namespace(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"processor1"</span>)
.to(SinkApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>).namespace(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"sink"</span>).args(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"--debug=true"</span>).run(args);
}
}</pre><p>Once the 'namespace' is set for the individual applications, the application properties with the <code class="literal">namespace</code> as prefix can be passed to the aggregate application using any supported property source (commandline, environment properties etc.,)</p><p>For instance, to override the default <code class="literal">fixedDelay</code> and <code class="literal">debug</code> properties of 'source' and 'sink' applications:</p><pre class="screen">java -jar target/MyAggregateApplication-0.0.1-SNAPSHOT.jar --source.fixedDelay=10000 --sink.debug=false</pre></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_configuring_binding_service_properties_for_non_self_contained_aggregate_application" href="#_configuring_binding_service_properties_for_non_self_contained_aggregate_application"></a>Configuring binding service properties for non self contained aggregate application</h4></div></div></div><p>The non self-contained aggregate application is bound to external broker via either or both the inbound/outbound components (typically, message channels) of the aggregate application while the applications inside the aggregate application are directly bound.
For example: a source application&#8217;s output and a processor application&#8217;s input are directly bound while the processor&#8217;s output channel is bound to an external destination at the broker.
When passing the binding service properties for non-self contained aggregate application, it is required to pass the binding service properties to the aggregate application instead of setting them as 'args' to individual child application.
For instance,</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> SampleAggregateApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> AggregateApplicationBuilder()
.from(SourceApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>).namespace(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"source"</span>).args(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"--fixedDelay=5000"</span>)
.via(ProcessorApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>).namespace(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"processor1"</span>).args(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"--debug=true"</span>).run(args);
}
}</pre><p>The binding properties like <code class="literal">--spring.cloud.stream.bindings.output.destination=processor-output</code> need to be specified as one of the external configuration properties (cmdline arg etc.,).</p></div></div></div></div><div class="navfooter"><hr><table width="100%" summary="Navigation footer"><tr><td width="40%" align="left"><a accesskey="p" href="multi__main_concepts.html">Prev</a>&nbsp;</td><td width="20%" align="center"><a accesskey="u" href="multi__spring_cloud_stream.html">Up</a></td><td width="40%" align="right">&nbsp;<a accesskey="n" href="multi__binders.html">Next</a></td></tr><tr><td width="40%" align="left" valign="top">24.&nbsp;Main Concepts&nbsp;</td><td width="20%" align="center"><a accesskey="h" href="multi_spring-cloud.html">Home</a></td><td width="40%" align="right" valign="top">&nbsp;26.&nbsp;Binders</td></tr></table></div></body></html>