Files
spring-cloud-static/Finchley.SR4/multi/multi__programming_model.html
2019-06-11 10:07:49 +02:00

343 lines
68 KiB
HTML

<html><head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>27.&nbsp;Programming Model</title><link rel="stylesheet" type="text/css" href="css/manual-multipage.css"><meta name="generator" content="DocBook XSL Stylesheets V1.79.1"><link rel="home" href="multi_spring-cloud.html" title="Spring Cloud"><link rel="up" href="multi__spring_cloud_stream.html" title="Part&nbsp;V.&nbsp;Spring Cloud Stream"><link rel="prev" href="multi__main_concepts.html" title="26.&nbsp;Main Concepts"><link rel="next" href="multi_spring-cloud-stream-overview-binders.html" title="28.&nbsp;Binders"></head><body bgcolor="white" text="black" link="#0000FF" vlink="#840084" alink="#0000FF"><div class="navheader"><table width="100%" summary="Navigation header"><tr><th colspan="3" align="center">27.&nbsp;Programming Model</th></tr><tr><td width="20%" align="left"><a accesskey="p" href="multi__main_concepts.html">Prev</a>&nbsp;</td><th width="60%" align="center">Part&nbsp;V.&nbsp;Spring Cloud Stream</th><td width="20%" align="right">&nbsp;<a accesskey="n" href="multi_spring-cloud-stream-overview-binders.html">Next</a></td></tr></table><hr></div><div class="chapter"><div class="titlepage"><div><div><h2 class="title"><a name="_programming_model" href="#_programming_model"></a>27.&nbsp;Programming Model</h2></div></div></div><p>To understand the programming model, you should be familiar with the following core concepts:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><span class="strong"><strong>Destination Binders:</strong></span> Components responsible to provide integration with the external messaging systems.</li><li class="listitem"><span class="strong"><strong>Destination Bindings:</strong></span> Bridge between the external messaging systems and application provided <span class="emphasis"><em>Producers</em></span> and <span class="emphasis"><em>Consumers</em></span> of messages (created by the Destination Binders).</li><li class="listitem"><span class="strong"><strong>Message:</strong></span> The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).</li></ul></div><div class="informalfigure"><div class="mediaobject" align="center"><img src="images/SCSt-overview.png" align="middle" alt="SCSt overview"></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_destination_binders" href="#_destination_binders"></a>27.1&nbsp;Destination Binders</h2></div></div></div><p>Destination Binders are extension components of Spring Cloud Stream responsible for providing the necessary configuration and implementation to facilitate
integration with external messaging systems.
This integration is responsible for connectivity, delegation, and routing of messages to and from producers and consumers, data type conversion,
invocation of the user code, and more.</p><p>Binders handle a lot of the boiler plate responsibilities that would otherwise fall on your shoulders. However, to accomplish that, the binder still needs
some help in the form of minimalistic yet required set of instructions from the user, which typically come in the form of some type of configuration.</p><p>While it is out of scope of this section to discuss all of the available binder and binding configuration options (the rest of the manual covers them extensively),
<span class="emphasis"><em>Destination Binding</em></span> does require special attention. The next section discusses it in detail.</p></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_destination_bindings" href="#_destination_bindings"></a>27.2&nbsp;Destination Bindings</h2></div></div></div><p>As stated earlier, <span class="emphasis"><em>Destination Bindings</em></span> provide a bridge between the external messaging system and application-provided <span class="emphasis"><em>Producers</em></span> and <span class="emphasis"><em>Consumers</em></span>.</p><p>Applying the @EnableBinding annotation to one of the application&#8217;s configuration classes defines a destination binding.
The <code class="literal">@EnableBinding</code> annotation itself is meta-annotated with <code class="literal">@Configuration</code> and triggers the configuration of the Spring Cloud Stream infrastructure.</p><p>The following example shows a fully configured and functioning Spring Cloud Stream application that receives the payload of the message from the <code class="literal">INPUT</code>
destination as a <code class="literal">String</code> type (see <a class="xref" href="multi_content-type-management.html" title="30.&nbsp;Content Type Negotiation">Chapter&nbsp;30, <i>Content Type Negotiation</i></a> section), logs it to the console and sends it to the <code class="literal">OUTPUT</code> destination after converting it to upper case.</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> MyApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
SpringApplication.run(MyApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, args);
}
<em><span class="hl-annotation" style="color: gray">@StreamListener(Processor.INPUT)</span></em>
<em><span class="hl-annotation" style="color: gray">@SendTo(Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> String handle(String value) {
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Received: "</span> + value);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> value.toUpperCase();
}
}</pre><p>As you can see the <code class="literal">@EnableBinding</code> annotation can take one or more interface classes as parameters. The parameters are referred to as <span class="emphasis"><em>bindings</em></span>,
and they contain methods representing <span class="emphasis"><em>bindable components</em></span>.
These components are typically message channels (see <a class="link" href="https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html" target="_top">Spring Messaging</a>)
for channel-based binders (such as Rabbit, Kafka, and others). However other types of bindings can
provide support for the native features of the corresponding technology. For example Kafka Streams binder (formerly known as KStream) allows native bindings directly to Kafka Streams
(see <a class="link" href="https://docs.spring.io/autorepo/docs/spring-cloud-stream-binder-kafka-docs/1.1.0.M1/reference/htmlsingle/" target="_top">Kafka Streams</a> for more details).</p><p>Spring Cloud Stream already provides <span class="emphasis"><em>binding</em></span> interfaces for typical message exchange contracts, which include:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><span class="strong"><strong>Sink:</strong></span> Identifies the contract for the message consumer by providing the destination from which the message is consumed.</li><li class="listitem"><span class="strong"><strong>Source:</strong></span> Identifies the contract for the message producer by providing the destination to which the produced message is sent.</li><li class="listitem"><span class="strong"><strong>Processor:</strong></span> Encapsulates both the sink and the source contracts by exposing two destinations that allow consumption and production of messages.</li></ul></div><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Sink {
String INPUT = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"input"</span>;
<em><span class="hl-annotation" style="color: gray">@Input(Sink.INPUT)</span></em>
SubscribableChannel input();
}</pre><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Source {
String OUTPUT = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"output"</span>;
<em><span class="hl-annotation" style="color: gray">@Output(Source.OUTPUT)</span></em>
MessageChannel output();
}</pre><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Processor <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">extends</span> Source, Sink {}</pre><p>While the preceding example satisfies the majority of cases, you can also define your own contracts by defining your own bindings interfaces and use <code class="literal">@Input</code> and <code class="literal">@Output</code>
annotations to identify the actual <span class="emphasis"><em>bindable components</em></span>.</p><p>For example:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Barista {
<em><span class="hl-annotation" style="color: gray">@Input</span></em>
SubscribableChannel orders();
<em><span class="hl-annotation" style="color: gray">@Output</span></em>
MessageChannel hotDrinks();
<em><span class="hl-annotation" style="color: gray">@Output</span></em>
MessageChannel coldDrinks();
}</pre><p>Using the interface shown in the preceding example as a parameter to <code class="literal">@EnableBinding</code> triggers the creation of the three bound channels named <code class="literal">orders</code>, <code class="literal">hotDrinks</code>, and <code class="literal">coldDrinks</code>,
respectively.</p><p>You can provide as many binding interfaces as you need, as arguments to the <code class="literal">@EnableBinding</code> annotation, as shown in the following example:</p><pre class="programlisting">@EnableBinding(value = { Orders.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, Payment.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> })</pre><p>In Spring Cloud Stream, the bindable <code class="literal">MessageChannel</code> components are the Spring Messaging <code class="literal">MessageChannel</code> (for outbound) and its extension, <code class="literal">SubscribableChannel</code>,
(for inbound).</p><p><span class="strong"><strong>Pollable Destination Binding</strong></span></p><p>While the previously described bindings support event-based message consumption, sometimes you need more control, such as rate of consumption.</p><p>Starting with version 2.0, you can now bind a pollable consumer:</p><p>The following example shows how to bind a pollable consumer:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> PolledBarista {
<em><span class="hl-annotation" style="color: gray">@Input</span></em>
PollableMessageSource orders();
. . .
}</pre><p>In this case, an implementation of <code class="literal">PollableMessageSource</code> is bound to the <code class="literal">orders</code> &#8220;channel&#8221;. See <a class="xref" href="multi__programming_model.html#spring-cloud-streams-overview-using-polled-consumers" title="27.3.4&nbsp;Using Polled Consumers">Section&nbsp;27.3.4, &#8220;Using Polled Consumers&#8221;</a> for more details.</p><p><span class="strong"><strong>Customizing Channel Names</strong></span></p><p>By using the <code class="literal">@Input</code> and <code class="literal">@Output</code> annotations, you can specify a customized channel name for the channel, as shown in the following example:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> Barista {
<em><span class="hl-annotation" style="color: gray">@Input("inboundOrders")</span></em>
SubscribableChannel orders();
}</pre><p>In the preceding example, the created bound channel is named <code class="literal">inboundOrders</code>.</p><p>Normally, you need not access individual channels or bindings directly (other then configuring them via <code class="literal">@EnableBinding</code> annotation). However there may be
times, such as testing or other corner cases, when you do.</p><p>Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface.
That means you can have access to the interfaces representing the bindings or individual channels by auto-wiring either in your application, as shown in the following two examples:</p><p><span class="emphasis"><em>Autowire Binding interface</em></span></p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@Autowire</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> Source source
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> sayHello(String name) {
source.output().send(MessageBuilder.withPayload(name).build());
}</pre><p><span class="emphasis"><em>Autowire individual channel</em></span></p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@Autowire</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> MessageChannel output;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> sayHello(String name) {
output.send(MessageBuilder.withPayload(name).build());
}</pre><p>You can also use standard Spring&#8217;s <code class="literal">@Qualifier</code> annotation for cases when channel names are customized or in multiple-channel scenarios that require specifically named channels.</p><p>The following example shows how to use the @Qualifier annotation in this way:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@Autowire</span></em>
<em><span class="hl-annotation" style="color: gray">@Qualifier("myChannel")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> MessageChannel output;</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="spring-cloud-stream-overview-producing-consuming-messages" href="#spring-cloud-stream-overview-producing-consuming-messages"></a>27.3&nbsp;Producing and Consuming Messages</h2></div></div></div><p>You can write a Spring Cloud Stream application by using either Spring Integration annotations or Spring Cloud Stream native annotation.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_spring_integration_support" href="#_spring_integration_support"></a>27.3.1&nbsp;Spring Integration Support</h3></div></div></div><p>Spring Cloud Stream is built on the concepts and patterns defined by <a class="link" href="https://www.enterpriseintegrationpatterns.com/" target="_top">Enterprise Integration Patterns</a> and relies
in its internal implementation on an already established and popular implementation of Enterprise Integration Patterns within the Spring portfolio of projects:
<a class="link" href="https://projects.spring.io/spring-integration/" target="_top">Spring Integration</a> framework.</p><p>So its only natiural for it to support the foundation, semantics, and configuration options that are already established by Spring Integration</p><p>For example, you can attach the output channel of a <code class="literal">Source</code> to a <code class="literal">MessageSource</code> and use the familiar <code class="literal">@InboundChannelAdapter</code> annotation, as follows:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> TimerSource {
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<em><span class="hl-annotation" style="color: gray">@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> MessageSource&lt;String&gt; timerMessageSource() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> () -&gt; <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> GenericMessage&lt;&gt;(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hello Spring Cloud Stream"</span>);
}
}</pre><p>Similarly, you can use @Transformer or @ServiceActivator while providing an implementation of a message handler method for a <span class="emphasis"><em>Processor</em></span> binding contract, as shown in the following example:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> TransformProcessor {
<em><span class="hl-annotation" style="color: gray">@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Object transform(String message) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> message.toUpperCase();
}
}</pre><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>While this may be skipping ahead a bit, it is important to understand that, when you consume from the same binding using <code class="literal">@StreamListener</code> annotation, a pub-sub model is used.
Each method annotated with <code class="literal">@StreamListener</code> receives its own copy of a message, and each one has its own consumer group.
However, if you consume from the same binding by using one of the Spring Integration annotation (such as <code class="literal">@Aggregator</code>, <code class="literal">@Transformer</code>, or <code class="literal">@ServiceActivator</code>), those consume in a competing model.
No individual consumer group is created for each subscription.</p></td></tr></table></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_using_streamlistener_annotation" href="#_using_streamlistener_annotation"></a>27.3.2&nbsp;Using @StreamListener Annotation</h3></div></div></div><p>Complementary to its Spring Integration support, Spring Cloud Stream provides its own <code class="literal">@StreamListener</code> annotation, modeled after other Spring Messaging annotations
(<code class="literal">@MessageMapping</code>, <code class="literal">@JmsListener</code>, <code class="literal">@RabbitListener</code>, and others) and provides conviniences, such as content-based routing and others.</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> VoteHandler {
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
VotingService votingService;
<em><span class="hl-annotation" style="color: gray">@StreamListener(Sink.INPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> handle(Vote vote) {
votingService.record(vote);
}
}</pre><p>As with other Spring Messaging methods, method arguments can be annotated with <code class="literal">@Payload</code>, <code class="literal">@Headers</code>, and <code class="literal">@Header</code>.</p><p>For methods that return data, you must use the <code class="literal">@SendTo</code> annotation to specify the output binding destination for data returned by the method, as shown in the following example:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> TransformProcessor {
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
VotingService votingService;
<em><span class="hl-annotation" style="color: gray">@StreamListener(Processor.INPUT)</span></em>
<em><span class="hl-annotation" style="color: gray">@SendTo(Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> VoteResult handle(Vote vote) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> votingService.record(vote);
}
}</pre></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_using_streamlistener_for_content_based_routing" href="#_using_streamlistener_for_content_based_routing"></a>27.3.3&nbsp;Using @StreamListener for Content-based routing</h3></div></div></div><p>Spring Cloud Stream supports dispatching messages to multiple handler methods annotated with <code class="literal">@StreamListener</code> based on conditions.</p><p>In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">It must not return a value.</li><li class="listitem">It must be an individual message handling method (reactive API methods are not supported).</li></ul></div><p>The condition is specified by a SpEL expression in the <code class="literal">condition</code> argument of the annotation and is evaluated for each message.
All the handlers that match the condition are invoked in the same thread, and no assumption must be made about the order in which the invocations take place.</p><p>In the following example of a <code class="literal">@StreamListener</code> with dispatching conditions, all the messages bearing a header <code class="literal">type</code> with the value <code class="literal">bogey</code> are dispatched to the
<code class="literal">receiveBogey</code> method, and all the messages bearing a header <code class="literal">type</code> with the value <code class="literal">bacall</code> are dispatched to the <code class="literal">receiveBacall</code> method.</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> TestPojoWithAnnotatedArguments {
<em><span class="hl-annotation" style="color: gray">@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> receiveBogey(<em><span class="hl-annotation" style="color: gray">@Payload</span></em> BogeyPojo bogeyPojo) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// handle the message</span>
}
<em><span class="hl-annotation" style="color: gray">@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> receiveBacall(<em><span class="hl-annotation" style="color: gray">@Payload</span></em> BacallPojo bacallPojo) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// handle the message</span>
}
}</pre><p><span class="strong"><strong>Content Type Negotiation in the Context of <code class="literal">condition</code></strong></span></p><p>It is important to understand some of the mechanics behind content-based routing using the <code class="literal">condition</code> argument of <code class="literal">@StreamListener</code>, especially in the context of the type of the message as a whole.
It may also help if you familiarize yourself with the <a class="xref" href="multi_content-type-management.html" title="30.&nbsp;Content Type Negotiation">Chapter&nbsp;30, <i>Content Type Negotiation</i></a> before you proceed.</p><p>Consider the following scenario:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> CatsAndDogs {
<em><span class="hl-annotation" style="color: gray">@StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> bark(Dog dog) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// handle the message</span>
}
<em><span class="hl-annotation" style="color: gray">@StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> purr(Cat cat) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// handle the message</span>
}
}</pre><p>The preceding code is perfectly valid. It compiles and deploys without any issues, yet it never produces the result you expect.</p><p>That is because you are testing something that does not yet exist in a state you expect. That is becouse the payload of the message is not yet converted from the
wire format (<code class="literal">byte[]</code>) to the desired type.
In other words, it has not yet gone through the type conversion process described in the <a class="xref" href="multi_content-type-management.html" title="30.&nbsp;Content Type Negotiation">Chapter&nbsp;30, <i>Content Type Negotiation</i></a>.</p><p>So, unless you use a SPeL expression that evaluates raw data (for example, the value of the first byte in the byte array), use message header-based expressions
(such as <code class="literal">condition = "headers['type']=='dog'"</code>).</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>At the moment, dispatching through <code class="literal">@StreamListener</code> conditions is supported only for channel-based binders (not for reactive programming)
support.</p></td></tr></table></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="spring-cloud-streams-overview-using-polled-consumers" href="#spring-cloud-streams-overview-using-polled-consumers"></a>27.3.4&nbsp;Using Polled Consumers</h3></div></div></div><p>When using polled consumers, you poll the <code class="literal">PollableMessageSource</code> on demand.
Consider the following example of a polled consumer:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> PolledConsumer {
<em><span class="hl-annotation" style="color: gray">@Input</span></em>
PollableMessageSource destIn();
<em><span class="hl-annotation" style="color: gray">@Output</span></em>
MessageChannel destOut();
}</pre><p>Given the polled consumer in the preceding example, you might use it as follows:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> args -&gt; {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">while</span> (someCondition()) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">try</span> {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (!destIn.poll(m -&gt; {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> GenericMessage&lt;&gt;(newPayload));
})) {
Thread.sleep(<span class="hl-number">1000</span>);
}
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">catch</span> (Exception e) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// handle failure (throw an exception to reject the message);</span>
}
}
};
}</pre><p>The <code class="literal">PollableMessageSource.poll()</code> method takes a <code class="literal">MessageHandler</code> argument (often a lambda expression, as shown here).
It returns <code class="literal">true</code> if the message was received and successfully processed.</p><p>As with message-driven consumers, if the <code class="literal">MessageHandler</code> throws an exception, messages are published to error channels, as discussed in <span class="quote">&#8220;<span class="quote"><a class="xref" href="">???</a></span>&#8221;</span>.</p><p>Normally, the <code class="literal">poll()</code> method acknowledges the message when the <code class="literal">MessageHandler</code> exits.
If the method exits abnormally, the message is rejected (not re-queued).
You can override that behavior by taking responsibility for the acknowledgment, as shown in the following example:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> args -&gt; {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">while</span> (someCondition()) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (!dest1In.poll(m -&gt; {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// e.g. hand off to another thread which can perform the ack</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// or acknowledge(Status.REQUEUE)</span>
})) {
Thread.sleep(<span class="hl-number">1000</span>);
}
}
};
}</pre><div class="important" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Important"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Important]" src="images/important.png"></td><th align="left">Important</th></tr><tr><td align="left" valign="top"><p>You must <code class="literal">ack</code> (or <code class="literal">nack</code>) the message at some point, to avoid resource leaks.</p></td></tr></table></div><div class="important" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Important"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Important]" src="images/important.png"></td><th align="left">Important</th></tr><tr><td align="left" valign="top"><p>Some messaging systems (such as Apache Kafka) maintain a simple offset in a log. If a delivery fails and is re-queued with <code class="literal">StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);</code>, any later successfully ack&#8217;d messages are redelivered.</p></td></tr></table></div><p>There is also an overloaded <code class="literal">poll</code> method, for which the definition is as follows:</p><pre class="programlisting">poll(MessageHandler handler, ParameterizedTypeReference&lt;?&gt; type)</pre><p>The <code class="literal">type</code> is a conversion hint that allows the incoming message payload to be converted, as shown in the following example:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">boolean</span> result = pollableSource.poll(received -&gt; {
Map&lt;String, Foo&gt; payload = (Map&lt;String, Foo&gt;) received.getPayload();
...
}, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> ParameterizedTypeReference&lt;Map&lt;String, Foo&gt;&gt;() {});</pre></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="spring-cloud-stream-overview-error-handling" href="#spring-cloud-stream-overview-error-handling"></a>27.4&nbsp;Error Handling</h2></div></div></div><p>Errors happen, and Spring Cloud Stream provides several flexible mechanisms to handle them.
The error handling comes in two flavors:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><span class="strong"><strong>application:</strong></span> The error handling is done within the application (custom error handler).</li><li class="listitem"><span class="strong"><strong>system:</strong></span> The error handling is delegated to the binder (re-queue, DL, and others). Note that the techniques are dependent on binder implementation and the
capability of the underlying messaging middleware.</li></ul></div><p>Spring Cloud Stream uses the <a class="link" href="https://github.com/spring-projects/spring-retry" target="_top">Spring Retry</a> library to facilitate successful message processing. See <a class="xref" href="multi__programming_model.html#_retry_template" title="27.4.3&nbsp;Retry Template">Section&nbsp;27.4.3, &#8220;Retry Template&#8221;</a> for more details.
However, when all fails, the exceptions thrown by the message handlers are propagated back to the binder. At that point, binder invokes custom error handler or communicates
the error back to the messaging system (re-queue, DLQ, and others).</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_application_error_handling" href="#_application_error_handling"></a>27.4.1&nbsp;Application Error Handling</h3></div></div></div><p>There are two types of application-level error handling. Errors can be handled at each binding subscription or a global handler can handle all the binding subscription errors. Let&#8217;s review the details.</p><div class="figure"><a name="d0e8550" href="#d0e8550"></a><p class="title"><b>Figure&nbsp;27.1.&nbsp;A Spring Cloud Stream Sink Application with Custom and Global Error Handlers</b></p><div class="figure-contents"><div class="mediaobject" align="center"><img src="images/custom_vs_global_error_channels.png" align="middle" alt="custom vs global error channels"></div></div></div><br class="figure-break"><p>For each input binding, Spring Cloud Stream creates a dedicated error channel with the following semantics <code class="literal">&lt;destinationName&gt;.errors</code>.</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>The <code class="literal">&lt;destinationName&gt;</code> consists of the name of the binding (such as <code class="literal">input</code>) and the name of the group (such as <code class="literal">myGroup</code>).</p></td></tr></table></div><p>Consider the following:</p><pre class="programlisting">spring.cloud.stream.bindings.input.group=myGroup</pre><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@StreamListener(Sink.INPUT)</span></em> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// destination name 'input.myGroup'</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> handle(Person value) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">throw</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> RuntimeException(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"BOOM!"</span>);
}
<em><span class="hl-annotation" style="color: gray">@ServiceActivator(inputChannel = Processor.INPUT + ".myGroup.errors")</span></em> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">//channel name 'input.myGroup.errors'</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> error(Message&lt;?&gt; message) {
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Handling ERROR: "</span> + message);
}</pre><p>In the preceding example the destination name is <code class="literal">input.myGroup</code> and the dedicated error channel name is <code class="literal">input.myGroup.errors</code>.</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>The use of @StreamListener annotation is intended specifically to define bindings that bridge internal channels and external destinations. Given that the destination
specific error channel does NOT have an associated external destination, such channel is a prerogative of Spring Integration (SI). This means that the handler
for such destination must be defined using one of the SI handler annotations (i.e., @ServiceActivator, @Transformer etc.).</p></td></tr></table></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>If <code class="literal">group</code> is not specified anonymous group is used (something like <code class="literal">input.anonymous.2K37rb06Q6m2r51-SPIDDQ</code>), which is not suitable for error
handling scenarious, since you don&#8217;t know what it&#8217;s going to be until the destination is created.</p></td></tr></table></div><p>Also, in the event you are binding to the existing destination such as:</p><pre class="programlisting">spring.cloud.stream.bindings.input.destination=myFooDestination
spring.cloud.stream.bindings.input.group=myGroup</pre><p>the full destination name is <code class="literal">myFooDestination.myGroup</code> and then the dedicated error channel name is <code class="literal">myFooDestination.myGroup.errors</code>.</p><p>Back to the example&#8230;&#8203;</p><p>The <code class="literal">handle(..)</code> method, which subscribes to the channel named <code class="literal">input</code>, throws an exception. Given there is also a subscriber to the error channel <code class="literal">input.myGroup.errors</code>
all error messages are handled by this subscriber.</p><p>If you have multiple bindings, you may want to have a single error handler. Spring Cloud Stream automatically provides support for
a <span class="emphasis"><em>global error channel</em></span> by bridging each individual error channel to the channel named <code class="literal">errorChannel</code>, allowing a single subscriber to handle all errors,
as shown in the following example:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@StreamListener("errorChannel")</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> error(Message&lt;?&gt; message) {
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Handling ERROR: "</span> + message);
}</pre><p>This may be a convenient option if error handling logic is the same regardless of which handler produced the error.</p><p>Also, error messages sent to the <code class="literal">errorChannel</code> can be published to the specific destination at the broker by configuring a binding named <code class="literal">error</code> for the outbound target.
This option provides a mechanism to automatically send error messages to another application bound to that destination or for later retrieval (for example, audit).
For example, to publish error messages to a broker destination named <code class="literal">myErrors</code>, set the following property:</p><pre class="programlisting">spring.cloud.stream.bindings.error.destination=myErrors.</pre><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>The ability to bridge global error channel to a broker destination essentially provides a mechanism which connects
the <span class="emphasis"><em>application-level</em></span> error handling with the <span class="emphasis"><em>system-level</em></span> error handling.</p></td></tr></table></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_system_error_handling" href="#_system_error_handling"></a>27.4.2&nbsp;System Error Handling</h3></div></div></div><p>System-level error handling implies that the errors are communicated back to the messaging system and, given that not every messaging system
is the same, the capabilities may differ from binder to binder.</p><p>That said, in this section we explain the general idea behind system level error handling and use Rabbit binder as an example. NOTE: Kafka binder provides similar
support, although some configuration properties do differ. Also, for more details and configuration options, see the individual binder&#8217;s documentation.</p><p>If no internal error handlers are configured, the errors propagate to the binders, and the binders subsequently propagate those errors back to the messaging system.
Depending on the capabilities of the messaging system such a system may <span class="emphasis"><em>drop</em></span> the message, <span class="emphasis"><em>re-queue</em></span> the message for re-processing or <span class="emphasis"><em>send the failed message to DLQ</em></span>.
Both Rabbit and Kafka support these concepts. However, other binders may not, so refer to your individual binder&#8217;s documentation for details on supported system-level
error-handling options.</p><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_drop_failed_messages" href="#_drop_failed_messages"></a>Drop Failed Messages</h4></div></div></div><p>By default, if no additional system-level configuration is provided, the messaging system drops the failed message.
While acceptable in some cases, for most cases, it is not, and we need some recovery mechanism to avoid message loss.</p></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_dlq_dead_letter_queue" href="#_dlq_dead_letter_queue"></a>DLQ - Dead Letter Queue</h4></div></div></div><p>DLQ allows failed messages to be sent to a special destination: - <span class="emphasis"><em>Dead Letter Queue</em></span>.</p><p>When configured, failed messages are sent to this destination for subsequent re-processing or auditing and reconciliation.</p><p>For example, continuing on the previous example and to set up the DLQ with Rabbit binder, you need to set the following property:</p><pre class="programlisting">spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true</pre><p>Keep in mind that, in the above property, <code class="literal">input</code> corresponds to the name of the input destination binding.
The <code class="literal">consumer</code> indicates that it is a consumer property and <code class="literal">auto-bind-dlq</code> instructs the binder to configure DLQ for <code class="literal">input</code>
destination, which results in an additional Rabbit queue named <code class="literal">input.myGroup.dlq</code>.</p><p>Once configured, all failed messages are routed to this queue with an error message similar to the following:</p><pre class="programlisting">delivery_mode: 1
headers:
x-death:
count: 1
reason: rejected
queue: input.hello
time: 1522328151
exchange:
routing-keys: input.myGroup
Payload {"name&#8221;:"Bob"}</pre><p>As you can see from the above, your original message is preserved for further actions.</p><p>However, one thing you may have noticed is that there is limited information on the original issue with the message processing. For example, you do not see a stack
trace corresponding to the original error.
To get more relevant information about the original error, you must set an additional property:</p><pre class="programlisting">spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true</pre><p>Doing so forces the internal error handler to intercept the error message and add additional information to it before publishing it to DLQ.
Once configured, you can see that the error message contains more information relevant to the original error, as follows:</p><pre class="programlisting">delivery_mode: 2
headers:
x-original-exchange:
x-exception-message: has an error
x-original-routingKey: input.myGroup
x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is
org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at. . . . .
Payload {"name&#8221;:"Bob"}</pre><p>This effectively combines application-level and system-level error handling to further assist with downstream troubleshooting mechanics.</p></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_re_queue_failed_messages" href="#_re_queue_failed_messages"></a>Re-queue Failed Messages</h4></div></div></div><p>As mentioned earlier, the currently supported binders (Rabbit and Kafka) rely on <code class="literal">RetryTemplate</code> to facilitate successful message processing. See <a class="xref" href="multi__programming_model.html#_retry_template" title="27.4.3&nbsp;Retry Template">Section&nbsp;27.4.3, &#8220;Retry Template&#8221;</a> for details.
However, for cases when <code class="literal">max-attempts</code> property is set to 1, internal reprocessing of the message is disabled. At this point, you can facilitate message re-processing (re-tries)
by instructing the messaging system to re-queue the failed message. Once re-queued, the failed message is sent back to the original handler, essentially creating a retry loop.</p><p>This option may be feasible for cases where the nature of the error is related to some sporadic yet short-term unavailability of some resource.</p><p>To accomplish that, you must set the following properties:</p><pre class="programlisting">spring.cloud.stream.bindings.input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true</pre><p>In the preceding example, the <code class="literal">max-attempts</code> set to 1 essentially disabling internal re-tries and <code class="literal">requeue-rejected</code> (short for <span class="emphasis"><em>requeue rejected messages</em></span>) is set to <code class="literal">true</code>.
Once set, the failed message is resubmitted to the same handler and loops continuously or until the handler throws <code class="literal">AmqpRejectAndDontRequeueException</code>
essentially allowing you to build your own re-try logic within the handler itself.</p></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_retry_template" href="#_retry_template"></a>27.4.3&nbsp;Retry Template</h3></div></div></div><p>The <code class="literal">RetryTemplate</code> is part of the <a class="link" href="https://github.com/spring-projects/spring-retry" target="_top">Spring Retry</a> library.
While it is out of scope of this document to cover all of the capabilities of the <code class="literal">RetryTemplate</code>, we will mention the following consumer properties that are specifically related to
the <code class="literal">RetryTemplate</code>:</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">maxAttempts</span></dt><dd><p class="simpara">The number of attempts to process the message.</p><p class="simpara">Default: 3.</p></dd><dt><span class="term">backOffInitialInterval</span></dt><dd><p class="simpara">The backoff initial interval on retry.</p><p class="simpara">Default 1000 milliseconds.</p></dd><dt><span class="term">backOffMaxInterval</span></dt><dd><p class="simpara">The maximum backoff interval.</p><p class="simpara">Default 10000 milliseconds.</p></dd><dt><span class="term">backOffMultiplier</span></dt><dd><p class="simpara">The backoff multiplier.</p><p class="simpara">Default 2.0.</p></dd></dl></div><p>While the preceding settings are sufficient for majority of the customization requirements, they may not satisfy certain complex requirements at, which
point you may want to provide your own instance of the <code class="literal">RetryTemplate</code>. To do so configure it as a bean in your application configuration. The application provided
instance will override the one provided by the framework. Also, to avoid conflicts you must qualify the instance of the <code class="literal">RetryTemplate</code> you want to be used by the binder
as <code class="literal">@StreamRetryTemplate</code>. For example,</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@StreamRetryTemplate</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> RetryTemplate myRetryTemplate() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> RetryTemplate();
}</pre><p>As you can see from the above example you don&#8217;t need to annotate it with <code class="literal">@Bean</code> since <code class="literal">@StreamRetryTemplate</code> is a qualified <code class="literal">@Bean</code>.</p></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="spring-cloud-stream-overview-reactive-programming-support" href="#spring-cloud-stream-overview-reactive-programming-support"></a>27.5&nbsp;Reactive Programming Support</h2></div></div></div><p>Spring Cloud Stream also supports the use of reactive APIs where incoming and outgoing data is handled as continuous data flows.
Support for reactive APIs is available through <code class="literal">spring-cloud-stream-reactive</code>, which needs to be added explicitly to your project.</p><p>The programming model with reactive APIs is declarative. Instead of specifying how each individual message should be handled, you can use operators that describe functional transformations from inbound to outbound data flows.</p><p>At present Spring Cloud Stream supports the only the <a class="link" href="https://projectreactor.io/" target="_top">Reactor API</a>.
In the future, we intend to support a more generic model based on Reactive Streams.</p><p>The reactive programming model also uses the <code class="literal">@StreamListener</code> annotation for setting up reactive handlers.
The differences are that:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">The <code class="literal">@StreamListener</code> annotation must not specify an input or output, as they are provided as arguments and return values from the method.</li><li class="listitem">The arguments of the method must be annotated with <code class="literal">@Input</code> and <code class="literal">@Output</code>, indicating which input or output the incoming and outgoing data flows connect to, respectively.</li><li class="listitem">The return value of the method, if any, is annotated with <code class="literal">@Output</code>, indicating the input where data should be sent.</li></ul></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>Reactive programming support requires Java 1.8.</p></td></tr></table></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>As of Spring Cloud Stream 1.1.1 and later (starting with release train Brooklyn.SR2), reactive programming support requires the use of Reactor 3.0.4.RELEASE and higher.
Earlier Reactor versions (including 3.0.1.RELEASE, 3.0.2.RELEASE and 3.0.3.RELEASE) are not supported.
<code class="literal">spring-cloud-stream-reactive</code> transitively retrieves the proper version, but it is possible for the project structure to manage the version of the <code class="literal">io.projectreactor:reactor-core</code> to an earlier release, especially when using Maven.
This is the case for projects generated by using Spring Initializr with Spring Boot 1.x, which overrides the Reactor version to <code class="literal">2.0.8.RELEASE</code>.
In such cases, you must ensure that the proper version of the artifact is released.
You can do so by adding a direct dependency on <code class="literal">io.projectreactor:reactor-core</code> with a version of <code class="literal">3.0.4.RELEASE</code> or later to your project.</p></td></tr></table></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>The use of term, <span class="quote">&#8220;<span class="quote">reactive</span>&#8221;</span>, currently refers to the reactive APIs being used and not to the execution model being reactive (that is, the bound endpoints still use a 'push' rather than a 'pull' model). While some backpressure support is provided by the use of Reactor, we do intend, in a future release, to support entirely reactive pipelines by the use of native reactive clients for the connected middleware.</p></td></tr></table></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_reactor_based_handlers" href="#_reactor_based_handlers"></a>27.5.1&nbsp;Reactor-based Handlers</h3></div></div></div><p>A Reactor-based handler can have the following argument types:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">For arguments annotated with <code class="literal">@Input</code>, it supports the Reactor <code class="literal">Flux</code> type.
The parameterization of the inbound Flux follows the same rules as in the case of individual message handling: It can be the entire <code class="literal">Message</code>, a POJO that can be the <code class="literal">Message</code> payload, or a POJO that is the result of a transformation based on the <code class="literal">Message</code> content-type header. Multiple inputs are provided.</li><li class="listitem">For arguments annotated with <code class="literal">Output</code>, it supports the <code class="literal">FluxSender</code> type, which connects a <code class="literal">Flux</code> produced by the method with an output. Generally speaking, specifying outputs as arguments is only recommended when the method can have multiple outputs.</li></ul></div><p>A Reactor-based handler supports a return type of <code class="literal">Flux</code>. In that case, it must be annotated with <code class="literal">@Output</code>. We recommend using the return value of the method when a single output <code class="literal">Flux</code> is available.</p><p>The following example shows a Reactor-based <code class="literal">Processor</code>:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> UppercaseTransformer {
<em><span class="hl-annotation" style="color: gray">@StreamListener</span></em>
<em><span class="hl-annotation" style="color: gray">@Output(Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Flux&lt;String&gt; receive(<em><span class="hl-annotation" style="color: gray">@Input(Processor.INPUT)</span></em> Flux&lt;String&gt; input) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> input.map(s -&gt; s.toUpperCase());
}
}</pre><p>The same processor using output arguments looks like the following example:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Processor.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> UppercaseTransformer {
<em><span class="hl-annotation" style="color: gray">@StreamListener</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> receive(<em><span class="hl-annotation" style="color: gray">@Input(Processor.INPUT)</span></em> Flux&lt;String&gt; input,
<em><span class="hl-annotation" style="color: gray">@Output(Processor.OUTPUT)</span></em> FluxSender output) {
output.send(input.map(s -&gt; s.toUpperCase()));
}
}</pre></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_reactive_sources" href="#_reactive_sources"></a>27.5.2&nbsp;Reactive Sources</h3></div></div></div><p>Spring Cloud Stream reactive support also provides the ability for creating reactive sources through the <code class="literal">@StreamEmitter</code> annotation.
By using the <code class="literal">@StreamEmitter</code> annotation, a regular source may be converted to a reactive one.
<code class="literal">@StreamEmitter</code> is a method level annotation that marks a method to be an emitter to outputs declared with <code class="literal">@EnableBinding</code>.
You cannot use the <code class="literal">@Input</code> annotation along with <code class="literal">@StreamEmitter</code>, as the methods marked with this annotation are not listening for any input. Rather, methods marked with <code class="literal">@StreamEmitter</code> generate output.
Following the same programming model used in <code class="literal">@StreamListener</code>, <code class="literal">@StreamEmitter</code> also allows flexible ways of using the <code class="literal">@Output</code> annotation, depending on whether the method has any arguments, a return type, and other considerations.</p><p>The remainder of this section contains examples of using the <code class="literal">@StreamEmitter</code> annotation in various styles.</p><p>The following example emits the <code class="literal">Hello, World</code> message every millisecond and publishes to a Reactor <code class="literal">Flux</code>:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> HelloWorldEmitter {
<em><span class="hl-annotation" style="color: gray">@StreamEmitter</span></em>
<em><span class="hl-annotation" style="color: gray">@Output(Source.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Flux&lt;String&gt; emit() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> Flux.intervalMillis(<span class="hl-number">1</span>)
.map(l -&gt; <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hello World"</span>);
}
}</pre><p>In the preceding example, the resulting messages in the <code class="literal">Flux</code> are sent to the output channel of the <code class="literal">Source</code>.</p><p>The next example is another flavor of an <code class="literal">@StreamEmmitter</code> that sends a Reactor <code class="literal">Flux</code>.
Instead of returning a <code class="literal">Flux</code>, the following method uses a <code class="literal">FluxSender</code> to programmatically send a <code class="literal">Flux</code> from a source:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> HelloWorldEmitter {
<em><span class="hl-annotation" style="color: gray">@StreamEmitter</span></em>
<em><span class="hl-annotation" style="color: gray">@Output(Source.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> emit(FluxSender output) {
output.send(Flux.intervalMillis(<span class="hl-number">1</span>)
.map(l -&gt; <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hello World"</span>));
}
}</pre><p>The next example is exactly same as the above snippet in functionality and style.
However, instead of using an explicit <code class="literal">@Output</code> annotation on the method, it uses the annotation on the method parameter.</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> HelloWorldEmitter {
<em><span class="hl-annotation" style="color: gray">@StreamEmitter</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> emit(<em><span class="hl-annotation" style="color: gray">@Output(Source.OUTPUT)</span></em> FluxSender output) {
output.send(Flux.intervalMillis(<span class="hl-number">1</span>)
.map(l -&gt; <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hello World"</span>));
}
}</pre><p>The last example in this section is yet another flavor of writing reacting sources by using the Reactive Streams Publisher API and taking advantage of the support for it in <a class="link" href="https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference" target="_top">Spring Integration Java DSL</a>.
The <code class="literal">Publisher</code> in the following example still uses Reactor <code class="literal">Flux</code> under the hood, but, from an application perspective, that is transparent to the user and only needs Reactive Streams and Java DSL for Spring Integration:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableAutoConfiguration</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> HelloWorldEmitter {
<em><span class="hl-annotation" style="color: gray">@StreamEmitter</span></em>
<em><span class="hl-annotation" style="color: gray">@Output(Source.OUTPUT)</span></em>
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Publisher&lt;Message&lt;String&gt;&gt; emit() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> IntegrationFlows.from(() -&gt;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> GenericMessage&lt;&gt;(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hello World"</span>),
e -&gt; e.poller(p -&gt; p.fixedDelay(<span class="hl-number">1</span>)))
.toReactivePublisher();
}
}</pre></div></div></div><div class="navfooter"><hr><table width="100%" summary="Navigation footer"><tr><td width="40%" align="left"><a accesskey="p" href="multi__main_concepts.html">Prev</a>&nbsp;</td><td width="20%" align="center"><a accesskey="u" href="multi__spring_cloud_stream.html">Up</a></td><td width="40%" align="right">&nbsp;<a accesskey="n" href="multi_spring-cloud-stream-overview-binders.html">Next</a></td></tr><tr><td width="40%" align="left" valign="top">26.&nbsp;Main Concepts&nbsp;</td><td width="20%" align="center"><a accesskey="h" href="multi_spring-cloud.html">Home</a></td><td width="40%" align="right" valign="top">&nbsp;28.&nbsp;Binders</td></tr></table></div></body></html>