Files
spring-cloud-static/Greenwich.M3/multi/multi__rabbitmq_binder.html
2018-11-27 10:29:13 -05:00

428 lines
84 KiB
HTML

<html><head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>40.&nbsp;RabbitMQ Binder</title><link rel="stylesheet" type="text/css" href="css/manual-multipage.css"><meta name="generator" content="DocBook XSL Stylesheets V1.78.1"><link rel="home" href="multi_spring-cloud.html" title="Spring Cloud"><link rel="up" href="multi__binder_implementations.html" title="Part&nbsp;VI.&nbsp;Binder Implementations"><link rel="prev" href="multi__apache_kafka_streams_binder.html" title="39.&nbsp;Apache Kafka Streams Binder"><link rel="next" href="multi__spring_cloud_bus.html" title="Part&nbsp;VII.&nbsp;Spring Cloud Bus"></head><body bgcolor="white" text="black" link="#0000FF" vlink="#840084" alink="#0000FF"><div class="navheader"><table width="100%" summary="Navigation header"><tr><th colspan="3" align="center">40.&nbsp;RabbitMQ Binder</th></tr><tr><td width="20%" align="left"><a accesskey="p" href="multi__apache_kafka_streams_binder.html">Prev</a>&nbsp;</td><th width="60%" align="center">Part&nbsp;VI.&nbsp;Binder Implementations</th><td width="20%" align="right">&nbsp;<a accesskey="n" href="multi__spring_cloud_bus.html">Next</a></td></tr></table><hr></div><div class="chapter"><div class="titlepage"><div><div><h2 class="title"><a name="_rabbitmq_binder" href="#_rabbitmq_binder"></a>40.&nbsp;RabbitMQ Binder</h2></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_usage_3" href="#_usage_3"></a>40.1&nbsp;Usage</h2></div></div></div><p>To use the RabbitMQ binder, you can add it to your Spring Cloud Stream application, by using the following Maven coordinates:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;dependency&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;groupId&gt;</span>org.springframework.cloud<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/groupId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;artifactId&gt;</span>spring-cloud-stream-binder-rabbit<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/artifactId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/dependency&gt;</span></pre><p>Alternatively, you can use the Spring Cloud Stream RabbitMQ Starter, as follows:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;dependency&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;groupId&gt;</span>org.springframework.cloud<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/groupId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;artifactId&gt;</span>spring-cloud-starter-stream-rabbit<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/artifactId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/dependency&gt;</span></pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_rabbitmq_binder_overview" href="#_rabbitmq_binder_overview"></a>40.2&nbsp;RabbitMQ Binder Overview</h2></div></div></div><p>The following simplified diagram shows how the RabbitMQ binder operates:</p><div class="figure"><a name="d0e13399" href="#d0e13399"></a><p class="title"><b>Figure&nbsp;40.1.&nbsp;RabbitMQ Binder</b></p><div class="figure-contents"><div class="mediaobject"><img src="images/rabbit-binder.png" alt="rabbit binder"></div></div></div><br class="figure-break"><p>By default, the RabbitMQ Binder implementation maps each destination to a <code class="literal">TopicExchange</code>.
For each consumer group, a <code class="literal">Queue</code> is bound to that <code class="literal">TopicExchange</code>.
Each consumer instance has a corresponding RabbitMQ <code class="literal">Consumer</code> instance for its group&#8217;s <code class="literal">Queue</code>.
For partitioned producers and consumers, the queues are suffixed with the partition index and use the partition index as the routing key.
For anonymous consumers (those with no <code class="literal">group</code> property), an auto-delete queue (with a randomized unique name) is used.</p><p>By using the optional <code class="literal">autoBindDlq</code> option, you can configure the binder to create and configure dead-letter queues (DLQs) (and a dead-letter exchange <code class="literal">DLX</code>, as well as routing infrastructure).
By default, the dead letter queue has the name of the destination, appended with <code class="literal">.dlq</code>.
If retry is enabled (<code class="literal">maxAttempts &gt; 1</code>), failed messages are delivered to the DLQ after retries are exhausted.
If retry is disabled (<code class="literal">maxAttempts = 1</code>), you should set <code class="literal">requeueRejected</code> to <code class="literal">false</code> (the default) so that failed messages are routed to the DLQ, instead of being re-queued.
In addition, <code class="literal">republishToDlq</code> causes the binder to publish a failed message to the DLQ (instead of rejecting it).
This feature lets additional information (such as the stack trace in the <code class="literal">x-exception-stacktrace</code> header) be added to the message in headers.
This option does not need retry enabled.
You can republish a failed message after just one attempt.
Starting with version 1.2, you can configure the delivery mode of republished messages.
See the <a class="link" href="multi__rabbitmq_binder.html#spring-cloud-stream-rabbit-republish-delivery-mode"><code class="literal">republishDeliveryMode</code> property</a>.</p><div class="important" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Important"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Important]" src="images/important.png"></td><th align="left">Important</th></tr><tr><td align="left" valign="top"><p>Setting <code class="literal">requeueRejected</code> to <code class="literal">true</code> (with <code class="literal">republishToDlq=false</code> ) causes the message to be re-queued and redelivered continually, which is likely not what you want unless the reason for the failure is transient.
In general, you should enable retry within the binder by setting <code class="literal">maxAttempts</code> to greater than one or by setting <code class="literal">republishToDlq</code> to <code class="literal">true</code>.</p></td></tr></table></div><p>See <a class="xref" href="multi__rabbitmq_binder.html#rabbit-binder-properties" title="40.3.1&nbsp;RabbitMQ Binder Properties">Section&nbsp;40.3.1, &#8220;RabbitMQ Binder Properties&#8221;</a> for more information about these properties.</p><p>The framework does not provide any standard mechanism to consume dead-letter messages (or to re-route them back to the primary queue).
Some options are described in <a class="xref" href="multi__rabbitmq_binder.html#rabbit-dlq-processing" title="40.6&nbsp;Dead-Letter Queue Processing">Section&nbsp;40.6, &#8220;Dead-Letter Queue Processing&#8221;</a>.</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>When multiple RabbitMQ binders are used in a Spring Cloud Stream application, it is important to disable 'RabbitAutoConfiguration' to avoid the same configuration from <code class="literal">RabbitAutoConfiguration</code> being applied to the two binders.
You can exclude the class by using the <code class="literal">@SpringBootApplication</code> annotation.</p></td></tr></table></div><p>Starting with version 2.0, the <code class="literal">RabbitMessageChannelBinder</code> sets the <code class="literal">RabbitTemplate.userPublisherConnection</code> property to <code class="literal">true</code> so that the non-transactional producers avoid deadlocks on consumers, which can happen if cached connections are blocked because of a <a class="link" href="https://www.rabbitmq.com/memory.html" target="_top">memory alarm</a> on the broker.</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>Currently, a <code class="literal">multiplex</code> consumer (a single consumer listening to multiple queues) is only supported for message-driven conssumers; polled consumers can only retrieve messages from a single queue.</p></td></tr></table></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_configuration_options_4" href="#_configuration_options_4"></a>40.3&nbsp;Configuration Options</h2></div></div></div><p>This section contains settings specific to the RabbitMQ Binder and bound channels.</p><p>For general binding configuration options and properties, see the <a class="link" href="https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc#configuration-options" target="_top">Spring Cloud Stream core documentation</a>.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="rabbit-binder-properties" href="#rabbit-binder-properties"></a>40.3.1&nbsp;RabbitMQ Binder Properties</h3></div></div></div><p>By default, the RabbitMQ binder uses Spring Boot&#8217;s <code class="literal">ConnectionFactory</code>.
Conseuqently, it supports all Spring Boot configuration options for RabbitMQ.
(For reference, see the <a class="link" href="http://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#common-application-properties" target="_top">Spring Boot documentation</a>).
RabbitMQ configuration options use the <code class="literal">spring.rabbitmq</code> prefix.</p><p>In addition to Spring Boot options, the RabbitMQ binder supports the following properties:</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">spring.cloud.stream.rabbit.binder.adminAddresses</span></dt><dd><p class="simpara">A comma-separated list of RabbitMQ management plugin URLs.
Only used when <code class="literal">nodes</code> contains more than one entry.
Each entry in this list must have a corresponding entry in <code class="literal">spring.rabbitmq.addresses</code>.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue.
See <a class="link" href="https://docs.spring.io/spring-amqp/reference/html/_reference.html#queue-affinity" target="_top">Queue Affinity and the LocalizedQueueConnectionFactory</a> for more information.</p><p class="simpara">Default: empty.</p></dd><dt><span class="term">spring.cloud.stream.rabbit.binder.nodes</span></dt><dd><p class="simpara">A comma-separated list of RabbitMQ node names.
When more than one entry, used to locate the server address where a queue is located.
Each entry in this list must have a corresponding entry in <code class="literal">spring.rabbitmq.addresses</code>.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue.
See <a class="link" href="https://docs.spring.io/spring-amqp/reference/html/_reference.html#queue-affinity" target="_top">Queue Affinity and the LocalizedQueueConnectionFactory</a> for more information.</p><p class="simpara">Default: empty.</p></dd><dt><span class="term">spring.cloud.stream.rabbit.binder.compressionLevel</span></dt><dd><p class="simpara">The compression level for compressed bindings.
See <code class="literal">java.util.zip.Deflater</code>.</p><p class="simpara">Default: <code class="literal">1</code> (BEST_LEVEL).</p></dd><dt><span class="term">spring.cloud.stream.binder.connection-name-prefix</span></dt><dd><p class="simpara">A connection name prefix used to name the connection(s) created by this binder.
The name is this prefix followed by <code class="literal">#n</code>, where <code class="literal">n</code> increments each time a new connection is opened.</p><p class="simpara">Default: none (Spring AMQP default).</p></dd></dl></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_rabbitmq_consumer_properties" href="#_rabbitmq_consumer_properties"></a>40.3.2&nbsp;RabbitMQ Consumer Properties</h3></div></div></div><p>The following properties are available for Rabbit consumers only and must be prefixed with <code class="literal">spring.cloud.stream.rabbit.bindings.&lt;channelName&gt;.consumer.</code>.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">acknowledgeMode</span></dt><dd><p class="simpara">The acknowledge mode.</p><p class="simpara">Default: <code class="literal">AUTO</code>.</p></dd><dt><span class="term">autoBindDlq</span></dt><dd><p class="simpara">Whether to automatically declare the DLQ and bind it to the binder DLX.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">bindingRoutingKey</span></dt><dd><p class="simpara">The routing key with which to bind the queue to the exchange (if <code class="literal">bindQueue</code> is <code class="literal">true</code>).
For partitioned destinations, <code class="literal">-&lt;instanceIndex&gt;</code> is appended.</p><p class="simpara">Default: <code class="literal">#</code>.</p></dd><dt><span class="term">bindQueue</span></dt><dd><p class="simpara">Whether to bind the queue to the destination exchange.
Set it to <code class="literal">false</code> if you have set up your own infrastructure and have previously created and bound the queue.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">consumerTagPrefix</span></dt><dd><p class="simpara">Used to create the consumer tag(s); will be appended by <code class="literal">#n</code> where <code class="literal">n</code> increments for each consumer created.
Example: <code class="literal">${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}</code>.</p><p class="simpara">Default: none - the broker will generate random consumer tags.</p></dd><dt><span class="term">deadLetterQueueName</span></dt><dd><p class="simpara">The name of the DLQ</p><p class="simpara">Default: <code class="literal">prefix+destination.dlq</code></p></dd><dt><span class="term">deadLetterExchange</span></dt><dd><p class="simpara">A DLX to assign to the queue.
Relevant only if <code class="literal">autoBindDlq</code> is <code class="literal">true</code>.</p><p class="simpara">Default: 'prefix+DLX'</p></dd><dt><span class="term">deadLetterExchangeType</span></dt><dd><p class="simpara">The type of the DLX to assign to the queue.
Relevant only if <code class="literal">autoBindDlq</code> is <code class="literal">true</code>.</p><p class="simpara">Default: 'direct'</p></dd><dt><span class="term">deadLetterRoutingKey</span></dt><dd><p class="simpara">A dead letter routing key to assign to the queue.
Relevant only if <code class="literal">autoBindDlq</code> is <code class="literal">true</code>.</p><p class="simpara">Default: <code class="literal">destination</code></p></dd><dt><span class="term">declareDlx</span></dt><dd><p class="simpara">Whether to declare the dead letter exchange for the destination.
Relevant only if <code class="literal">autoBindDlq</code> is <code class="literal">true</code>.
Set to <code class="literal">false</code> if you have a pre-configured DLX.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">declareExchange</span></dt><dd><p class="simpara">Whether to declare the exchange for the destination.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">delayedExchange</span></dt><dd><p class="simpara">Whether to declare the exchange as a <code class="literal">Delayed Message Exchange</code>.
Requires the delayed message exchange plugin on the broker.
The <code class="literal">x-delayed-type</code> argument is set to the <code class="literal">exchangeType</code>.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">dlqDeadLetterExchange</span></dt><dd><p class="simpara">If a DLQ is declared, a DLX to assign to that queue.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">dlqDeadLetterRoutingKey</span></dt><dd><p class="simpara">If a DLQ is declared, a dead letter routing key to assign to that queue.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">dlqExpires</span></dt><dd><p class="simpara">How long before an unused dead letter queue is deleted (in milliseconds).</p><p class="simpara">Default: <code class="literal">no expiration</code></p></dd><dt><span class="term">dlqLazy</span></dt><dd><p class="simpara">Declare the dead letter queue with the <code class="literal">x-queue-mode=lazy</code> argument.
See <a class="link" href="https://www.rabbitmq.com/lazy-queues.html" target="_top">&#8220;Lazy Queues&#8221;</a>.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">dlqMaxLength</span></dt><dd><p class="simpara">Maximum number of messages in the dead letter queue.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">dlqMaxLengthBytes</span></dt><dd><p class="simpara">Maximum number of total bytes in the dead letter queue from all messages.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">dlqMaxPriority</span></dt><dd><p class="simpara">Maximum priority of messages in the dead letter queue (0-255).</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">dlqOverflowBehavior</span></dt><dd><p class="simpara">Action to take when <code class="literal">dlqMaxLength</code> or <code class="literal">dlqMaxLengthBytes</code> is exceeded; currently <code class="literal">drop-head</code> or <code class="literal">reject-publish</code> but refer to the RabbitMQ documentation.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">dlqTtl</span></dt><dd><p class="simpara">Default time to live to apply to the dead letter queue when declared (in milliseconds).</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">durableSubscription</span></dt><dd><p class="simpara">Whether the subscription should be durable.
Only effective if <code class="literal">group</code> is also set.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">exchangeAutoDelete</span></dt><dd><p class="simpara">If <code class="literal">declareExchange</code> is true, whether the exchange should be auto-deleted (that is, removed after the last queue is removed).</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">exchangeDurable</span></dt><dd><p class="simpara">If <code class="literal">declareExchange</code> is true, whether the exchange should be durable (that is, it survives broker restart).</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">exchangeType</span></dt><dd><p class="simpara">The exchange type: <code class="literal">direct</code>, <code class="literal">fanout</code> or <code class="literal">topic</code> for non-partitioned destinations and <code class="literal">direct</code> or <code class="literal">topic</code> for partitioned destinations.</p><p class="simpara">Default: <code class="literal">topic</code>.</p></dd><dt><span class="term">exclusive</span></dt><dd><p class="simpara">Whether to create an exclusive consumer.
Concurrency should be 1 when this is <code class="literal">true</code>.
Often used when strict ordering is required but enabling a hot standby instance to take over after a failure.
See <code class="literal">recoveryInterval</code>, which controls how often a standby instance attempts to consume.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">expires</span></dt><dd><p class="simpara">How long before an unused queue is deleted (in milliseconds).</p><p class="simpara">Default: <code class="literal">no expiration</code></p></dd><dt><span class="term">failedDeclarationRetryInterval</span></dt><dd><p class="simpara">The interval (in milliseconds) between attempts to consume from a queue if it is missing.</p><p class="simpara">Default: 5000</p></dd><dt><span class="term">headerPatterns</span></dt><dd><p class="simpara">Patterns for headers to be mapped from inbound messages.</p><p class="simpara">Default: <code class="literal">['*']</code> (all headers).</p></dd><dt><span class="term">lazy</span></dt><dd><p class="simpara">Declare the queue with the <code class="literal">x-queue-mode=lazy</code> argument.
See <a class="link" href="https://www.rabbitmq.com/lazy-queues.html" target="_top">&#8220;Lazy Queues&#8221;</a>.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">maxConcurrency</span></dt><dd><p class="simpara">The maximum number of consumers.</p><p class="simpara">Default: <code class="literal">1</code>.</p></dd><dt><span class="term">maxLength</span></dt><dd><p class="simpara">The maximum number of messages in the queue.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">maxLengthBytes</span></dt><dd><p class="simpara">The maximum number of total bytes in the queue from all messages.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">maxPriority</span></dt><dd><p class="simpara">The maximum priority of messages in the queue (0-255).</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">missingQueuesFatal</span></dt><dd><p class="simpara">When the queue cannot be found, whether to treat the condition as fatal and stop the listener container.
Defaults to <code class="literal">false</code> so that the container keeps trying to consume from the queue&#8201;&#8212;&#8201;for example, when using a cluster and the node hosting a non-HA queue is down.</p><p class="simpara">Default: <code class="literal">false</code></p></dd><dt><span class="term">overflowBehavior</span></dt><dd><p class="simpara">Action to take when <code class="literal">maxLength</code> or <code class="literal">maxLengthBytes</code> is exceeded; currently <code class="literal">drop-head</code> or <code class="literal">reject-publish</code> but refer to the RabbitMQ documentation.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">prefetch</span></dt><dd><p class="simpara">Prefetch count.</p><p class="simpara">Default: <code class="literal">1</code>.</p></dd><dt><span class="term">prefix</span></dt><dd><p class="simpara">A prefix to be added to the name of the <code class="literal">destination</code> and queues.</p><p class="simpara">Default: "".</p></dd><dt><span class="term">queueDeclarationRetries</span></dt><dd><p class="simpara">The number of times to retry consuming from a queue if it is missing.
Relevant only when <code class="literal">missingQueuesFatal</code> is <code class="literal">true</code>.
Otherwise, the container keeps retrying indefinitely.</p><p class="simpara">Default: <code class="literal">3</code></p></dd><dt><span class="term">queueNameGroupOnly</span></dt><dd><p class="simpara">When true, consume from a queue with a name equal to the <code class="literal">group</code>.
Otherwise the queue name is <code class="literal">destination.group</code>.
This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.</p><p class="simpara">Default: false.</p></dd><dt><span class="term">recoveryInterval</span></dt><dd><p class="simpara">The interval between connection recovery attempts, in milliseconds.</p><p class="simpara">Default: <code class="literal">5000</code>.</p></dd><dt><span class="term">requeueRejected</span></dt><dd><p class="simpara">Whether delivery failures should be re-queued when retry is disabled or <code class="literal">republishToDlq</code> is <code class="literal">false</code>.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd></dl></div><div class="variablelist"><a name="spring-cloud-stream-rabbit-republish-delivery-mode" href="#spring-cloud-stream-rabbit-republish-delivery-mode"></a><dl class="variablelist"><dt><span class="term">republishDeliveryMode</span></dt><dd><p class="simpara">When <code class="literal">republishToDlq</code> is <code class="literal">true</code>, specifies the delivery mode of the republished message.</p><p class="simpara">Default: <code class="literal">DeliveryMode.PERSISTENT</code></p></dd><dt><span class="term">republishToDlq</span></dt><dd><p class="simpara">By default, messages that fail after retries are exhausted are rejected.
If a dead-letter queue (DLQ) is configured, RabbitMQ routes the failed message (unchanged) to the DLQ.
If set to <code class="literal">true</code>, the binder republishs failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure.</p><p class="simpara">Default: false</p></dd><dt><span class="term">transacted</span></dt><dd><p class="simpara">Whether to use transacted channels.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">ttl</span></dt><dd><p class="simpara">Default time to live to apply to the queue when declared (in milliseconds).</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">txSize</span></dt><dd><p class="simpara">The number of deliveries between acks.</p><p class="simpara">Default: <code class="literal">1</code>.</p></dd></dl></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_advanced_listener_container_configuration" href="#_advanced_listener_container_configuration"></a>40.3.3&nbsp;Advanced Listener Container Configuration</h3></div></div></div><p>To set listener container properties that are not exposed as binder or binding properties, add a single bean of type <code class="literal">ListenerContainerCustomizer</code> to the application context.
The binder and binding properties will be set and then the customizer will be called.
The customizer (<code class="literal">configure()</code> method) is provided with the queue name as well as the consumer group as arguments.</p></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_rabbit_producer_properties" href="#_rabbit_producer_properties"></a>40.3.4&nbsp;Rabbit Producer Properties</h3></div></div></div><p>The following properties are available for Rabbit producers only and
must be prefixed with <code class="literal">spring.cloud.stream.rabbit.bindings.&lt;channelName&gt;.producer.</code>.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">autoBindDlq</span></dt><dd><p class="simpara">Whether to automatically declare the DLQ and bind it to the binder DLX.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">batchingEnabled</span></dt><dd><p class="simpara">Whether to enable message batching by producers.
Messages are batched into one message according to the following properties (described in the next three entries in this list): 'batchSize', <code class="literal">batchBufferLimit</code>, and <code class="literal">batchTimeout</code>.
See <a class="link" href="https://docs.spring.io/spring-amqp//reference/html/_reference.html#template-batching" target="_top">Batching</a> for more information.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">batchSize</span></dt><dd><p class="simpara">The number of messages to buffer when batching is enabled.</p><p class="simpara">Default: <code class="literal">100</code>.</p></dd><dt><span class="term">batchBufferLimit</span></dt><dd><p class="simpara">The maximum buffer size when batching is enabled.</p><p class="simpara">Default: <code class="literal">10000</code>.</p></dd><dt><span class="term">batchTimeout</span></dt><dd><p class="simpara">The batch timeout when batching is enabled.</p><p class="simpara">Default: <code class="literal">5000</code>.</p></dd><dt><span class="term">bindingRoutingKey</span></dt><dd><p class="simpara">The routing key with which to bind the queue to the exchange (if <code class="literal">bindQueue</code> is <code class="literal">true</code>).
Only applies to non-partitioned destinations.
Only applies if <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">#</code>.</p></dd><dt><span class="term">bindQueue</span></dt><dd><p class="simpara">Whether to bind the queue to the destination exchange.
Set it to <code class="literal">false</code> if you have set up your own infrastructure and have previously created and bound the queue.
Only applies if <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">compress</span></dt><dd><p class="simpara">Whether data should be compressed when sent.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">deadLetterQueueName</span></dt><dd><p class="simpara">The name of the DLQ
Only applies if <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">prefix+destination.dlq</code></p></dd><dt><span class="term">deadLetterExchange</span></dt><dd><p class="simpara">A DLX to assign to the queue.
Relevant only when <code class="literal">autoBindDlq</code> is <code class="literal">true</code>.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: 'prefix+DLX'</p></dd><dt><span class="term">deadLetterExchangeType</span></dt><dd><p class="simpara">The type of the DLX to assign to the queue.
Relevant only if <code class="literal">autoBindDlq</code> is <code class="literal">true</code>.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: 'direct'</p></dd><dt><span class="term">deadLetterRoutingKey</span></dt><dd><p class="simpara">A dead letter routing key to assign to the queue.
Relevant only when <code class="literal">autoBindDlq</code> is <code class="literal">true</code>.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">destination</code></p></dd><dt><span class="term">declareDlx</span></dt><dd><p class="simpara">Whether to declare the dead letter exchange for the destination.
Relevant only if <code class="literal">autoBindDlq</code> is <code class="literal">true</code>.
Set to <code class="literal">false</code> if you have a pre-configured DLX.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">declareExchange</span></dt><dd><p class="simpara">Whether to declare the exchange for the destination.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">delayExpression</span></dt><dd><p class="simpara">A SpEL expression to evaluate the delay to apply to the message (<code class="literal">x-delay</code> header).
It has no effect if the exchange is not a delayed message exchange.</p><p class="simpara">Default: No <code class="literal">x-delay</code> header is set.</p></dd><dt><span class="term">delayedExchange</span></dt><dd><p class="simpara">Whether to declare the exchange as a <code class="literal">Delayed Message Exchange</code>.
Requires the delayed message exchange plugin on the broker.
The <code class="literal">x-delayed-type</code> argument is set to the <code class="literal">exchangeType</code>.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">deliveryMode</span></dt><dd><p class="simpara">The delivery mode.</p><p class="simpara">Default: <code class="literal">PERSISTENT</code>.</p></dd><dt><span class="term">dlqDeadLetterExchange</span></dt><dd><p class="simpara">When a DLQ is declared, a DLX to assign to that queue.
Applies only if <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">dlqDeadLetterRoutingKey</span></dt><dd><p class="simpara">When a DLQ is declared, a dead letter routing key to assign to that queue.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">dlqExpires</span></dt><dd><p class="simpara">How long (in milliseconds) before an unused dead letter queue is deleted.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">no expiration</code></p></dd><dt><span class="term">dlqLazy</span></dt><dd>Declare the dead letter queue with the <code class="literal">x-queue-mode=lazy</code> argument.
See <a class="link" href="https://www.rabbitmq.com/lazy-queues.html" target="_top">&#8220;Lazy Queues&#8221;</a>.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</dd><dt><span class="term">dlqMaxLength</span></dt><dd><p class="simpara">Maximum number of messages in the dead letter queue.
Applies only if <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">dlqMaxLengthBytes</span></dt><dd><p class="simpara">Maximum number of total bytes in the dead letter queue from all messages.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">dlqMaxPriority</span></dt><dd><p class="simpara">Maximum priority of messages in the dead letter queue (0-255)
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">dlqTtl</span></dt><dd><p class="simpara">Default time (in milliseconds) to live to apply to the dead letter queue when declared.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">exchangeAutoDelete</span></dt><dd><p class="simpara">If <code class="literal">declareExchange</code> is <code class="literal">true</code>, whether the exchange should be auto-delete (it is removed after the last queue is removed).</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">exchangeDurable</span></dt><dd><p class="simpara">If <code class="literal">declareExchange</code> is <code class="literal">true</code>, whether the exchange should be durable (survives broker restart).</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">exchangeType</span></dt><dd><p class="simpara">The exchange type: <code class="literal">direct</code>, <code class="literal">fanout</code> or <code class="literal">topic</code> for non-partitioned destinations and <code class="literal">direct</code> or <code class="literal">topic</code> for partitioned destinations.</p><p class="simpara">Default: <code class="literal">topic</code>.</p></dd><dt><span class="term">expires</span></dt><dd><p class="simpara">How long (in milliseconds) before an unused queue is deleted.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">no expiration</code></p></dd><dt><span class="term">headerPatterns</span></dt><dd><p class="simpara">Patterns for headers to be mapped to outbound messages.</p><p class="simpara">Default: <code class="literal">['*']</code> (all headers).</p></dd><dt><span class="term">lazy</span></dt><dd><p class="simpara">Declare the queue with the <code class="literal">x-queue-mode=lazy</code> argument.
See <a class="link" href="https://www.rabbitmq.com/lazy-queues.html" target="_top">&#8220;Lazy Queues&#8221;</a>.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">maxLength</span></dt><dd><p class="simpara">Maximum number of messages in the queue.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">maxLengthBytes</span></dt><dd><p class="simpara">Maximum number of total bytes in the queue from all messages.
Only applies if <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd><dt><span class="term">maxPriority</span></dt><dd><p class="simpara">Maximum priority of messages in the queue (0-255).
Only applies if <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">prefix</span></dt><dd><p class="simpara">A prefix to be added to the name of the <code class="literal">destination</code> exchange.</p><p class="simpara">Default: "".</p></dd><dt><span class="term">queueNameGroupOnly</span></dt><dd><p class="simpara">When <code class="literal">true</code>, consume from a queue with a name equal to the <code class="literal">group</code>.
Otherwise the queue name is <code class="literal">destination.group</code>.
This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: false.</p></dd><dt><span class="term">routingKeyExpression</span></dt><dd><p class="simpara">A SpEL expression to determine the routing key to use when publishing messages.
For a fixed routing key, use a literal expression, such as <code class="literal">routingKeyExpression='my.routingKey'</code> in a properties file or <code class="literal">routingKeyExpression: '''my.routingKey'''</code> in a YAML file.</p><p class="simpara">Default: <code class="literal">destination</code> or <code class="literal">destination-&lt;partition&gt;</code> for partitioned destinations.</p></dd><dt><span class="term">transacted</span></dt><dd><p class="simpara">Whether to use transacted channels.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">ttl</span></dt><dd><p class="simpara">Default time (in milliseconds) to live to apply to the queue when declared.
Applies only when <code class="literal">requiredGroups</code> are provided and then only to those groups.</p><p class="simpara">Default: <code class="literal">no limit</code></p></dd></dl></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>In the case of RabbitMQ, content type headers can be set by external applications.
Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport&#8201;&#8212;&#8201;including transports, such as Kafka (prior to 0.11), that do not natively support headers.</p></td></tr></table></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_retry_with_the_rabbitmq_binder" href="#_retry_with_the_rabbitmq_binder"></a>40.4&nbsp;Retry With the RabbitMQ Binder</h2></div></div></div><p>When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured.
This might be important when strict ordering is required with a single consumer. However, for other use cases, it prevents other messages from being processed on that thread.
An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ) as well as dead-letter configuration on the DLQ itself.
See &#8220;<a class="xref" href="multi__rabbitmq_binder.html#rabbit-binder-properties" title="40.3.1&nbsp;RabbitMQ Binder Properties">Section&nbsp;40.3.1, &#8220;RabbitMQ Binder Properties&#8221;</a>&#8221; for more information about the properties discussed here.
You can use the following example configuration to enable this feature:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">Set <code class="literal">autoBindDlq</code> to <code class="literal">true</code>.
The binder create a DLQ.
Optionally, you can specify a name in <code class="literal">deadLetterQueueName</code>.</li><li class="listitem">Set <code class="literal">dlqTtl</code> to the back off time you want to wait between redeliveries.</li><li class="listitem">Set the <code class="literal">dlqDeadLetterExchange</code> to the default exchange.
Expired messages from the DLQ are routed to the original queue, because the default <code class="literal">deadLetterRoutingKey</code> is the queue name (<code class="literal">destination.group</code>).
Setting to the default exchange is achieved by setting the property with no value, as shown in the next example.</li></ul></div><p>To force a message to be dead-lettered, either throw an <code class="literal">AmqpRejectAndDontRequeueException</code> or set <code class="literal">requeueRejected</code> to <code class="literal">true</code> (the default) and throw any exception.</p><p>The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts.
Fortunately, RabbitMQ provides the <code class="literal">x-death</code> header, which lets you determine how many cycles have occurred.</p><p>To acknowledge a message after giving up, throw an <code class="literal">ImmediateAcknowledgeAmqpException</code>.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_putting_it_all_together" href="#_putting_it_all_together"></a>40.4.1&nbsp;Putting it All Together</h3></div></div></div><p>The following configuration creates an exchange <code class="literal">myDestination</code> with queue <code class="literal">myDestination.consumerGroup</code> bound to a topic exchange with a wildcard routing key <code class="literal">#</code>:</p><pre class="screen">---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---</pre><p>This configuration creates a DLQ bound to a direct exchange (<code class="literal">DLX</code>) with a routing key of <code class="literal">myDestination.consumerGroup</code>.
When messages are rejected, they are routed to the DLQ.
After 5 seconds, the message expires and is routed to the original queue by using the queue name as the routing key, as shown in the following example:</p><p><b>Spring Boot application.&nbsp;</b>
</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> XDeathApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
SpringApplication.run(XDeathApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, args);
}
<em><span class="hl-annotation" style="color: gray">@StreamListener(Sink.INPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> listen(String in, <em><span class="hl-annotation" style="color: gray">@Header(name = "x-death", required = false)</span></em> Map&lt;?,?&gt; death) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (death != null &amp;&amp; death.get(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"count"</span>).equals(<span class="hl-number">3L</span>)) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-comment">// giving up - don't send to DLX</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">throw</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> ImmediateAcknowledgeAmqpException(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Failed after 4 attempts"</span>);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">throw</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> AmqpRejectAndDontRequeueException(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"failed"</span>);
}
}</pre><p>
</p><p>Notice that the count property in the <code class="literal">x-death</code> header is a <code class="literal">Long</code>.</p></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="rabbit-error-channels" href="#rabbit-error-channels"></a>40.5&nbsp;Error Channels</h2></div></div></div><p>Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
See &#8220;<a class="xref" href="">???</a>&#8221; for more information.</p><p>RabbitMQ has two types of send failures:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">Returned messages,</li><li class="listitem">Negatively acknowledged <a class="link" href="https://www.rabbitmq.com/confirms.html" target="_top">Publisher Confirms</a>.</li></ul></div><p>The latter is rare.
According to the RabbitMQ documentation "[A nack] will only be delivered if an internal error occurs in the Erlang process responsible for a queue.".</p><p>As well as enabling producer error channels (as described in &#8220;<a class="xref" href="">???</a>&#8221;), the RabbitMQ binder only sends messages to the channels if the connection factory is appropriately configured, as follows:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><code class="literal">ccf.setPublisherConfirms(true);</code></li><li class="listitem"><code class="literal">ccf.setPublisherReturns(true);</code></li></ul></div><p>When using Spring Boot configuration for the connection factory, set the following properties:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><code class="literal">spring.rabbitmq.publisher-confirms</code></li><li class="listitem"><code class="literal">spring.rabbitmq.publisher-returns</code></li></ul></div><p>The payload of the <code class="literal">ErrorMessage</code> for a returned message is a <code class="literal">ReturnedAmqpMessageException</code> with the following properties:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><code class="literal">failedMessage</code>: The spring-messaging <code class="literal">Message&lt;?&gt;</code> that failed to be sent.</li><li class="listitem"><code class="literal">amqpMessage</code>: The raw spring-amqp <code class="literal">Message</code>.</li><li class="listitem"><code class="literal">replyCode</code>: An integer value indicating the reason for the failure (for example, 312 - No route).</li><li class="listitem"><code class="literal">replyText</code>: A text value indicating the reason for the failure (for example, <code class="literal">NO_ROUTE</code>).</li><li class="listitem"><code class="literal">exchange</code>: The exchange to which the message was published.</li><li class="listitem"><code class="literal">routingKey</code>: The routing key used when the message was published.</li></ul></div><p>For negatively acknowledged confirmations, the payload is a <code class="literal">NackedAmqpMessageException</code> with the following properties:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><code class="literal">failedMessage</code>: The spring-messaging <code class="literal">Message&lt;?&gt;</code> that failed to be sent.</li><li class="listitem"><code class="literal">nackReason</code>: A reason (if available&#8201;&#8212;&#8201;you may need to examine the broker logs for more information).</li></ul></div><p>There is no automatic handling of these exceptions (such as sending to a <a class="link" href="multi__rabbitmq_binder.html#rabbit-dlq-processing" title="40.6&nbsp;Dead-Letter Queue Processing">dead-letter queue</a>).
You can consume these exceptions with your own Spring Integration flow.</p></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="rabbit-dlq-processing" href="#rabbit-dlq-processing"></a>40.6&nbsp;Dead-Letter Queue Processing</h2></div></div></div><p>Because you cannot anticipate how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue.
However, if the problem is a permanent issue, that could cause an infinite loop.
The following Spring Boot application shows an example of how to route those messages back to the original queue but moves them to a third &#8220;parking lot&#8221; queue after three attempts.
The second example uses the <a class="link" href="https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/" target="_top">RabbitMQ Delayed Message Exchange</a> to introduce a delay to the re-queued message.
In this example, the delay increases for each attempt.
These examples use a <code class="literal">@RabbitListener</code> to receive messages from the DLQ.
You could also use <code class="literal">RabbitTemplate.receive()</code> in a batch process.</p><p>The examples assume the original destination is <code class="literal">so8400in</code> and the consumer group is <code class="literal">so8400</code>.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_non_partitioned_destinations" href="#_non_partitioned_destinations"></a>40.6.1&nbsp;Non-Partitioned Destinations</h3></div></div></div><p>The first two examples are for when the destination is <span class="strong"><strong>not</strong></span> partitioned:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> ReRouteDlqApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String ORIGINAL_QUEUE = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"so8400in.so8400"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String DLQ = ORIGINAL_QUEUE + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">".dlq"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String PARKING_LOT = ORIGINAL_QUEUE + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">".parkingLot"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String X_RETRIES_HEADER = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"x-retries"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">throws</span> Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, args);
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hit enter to terminate"</span>);
System.in.read();
context.close();
}
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> RabbitTemplate rabbitTemplate;
<em><span class="hl-annotation" style="color: gray">@RabbitListener(queues = DLQ)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retriesHeader == null) {
retriesHeader = Integer.valueOf(<span class="hl-number">0</span>);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retriesHeader &lt; <span class="hl-number">3</span>) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + <span class="hl-number">1</span>);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">else</span> {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Queue parkingLot() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Queue(PARKING_LOT);
}
}</pre><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> ReRouteDlqApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String ORIGINAL_QUEUE = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"so8400in.so8400"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String DLQ = ORIGINAL_QUEUE + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">".dlq"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String PARKING_LOT = ORIGINAL_QUEUE + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">".parkingLot"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String X_RETRIES_HEADER = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"x-retries"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String DELAY_EXCHANGE = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"dlqReRouter"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">throws</span> Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, args);
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hit enter to terminate"</span>);
System.in.read();
context.close();
}
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> RabbitTemplate rabbitTemplate;
<em><span class="hl-annotation" style="color: gray">@RabbitListener(queues = DLQ)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> rePublish(Message failedMessage) {
Map&lt;String, Object&gt; headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retriesHeader == null) {
retriesHeader = Integer.valueOf(<span class="hl-number">0</span>);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retriesHeader &lt; <span class="hl-number">3</span>) {
headers.put(X_RETRIES_HEADER, retriesHeader + <span class="hl-number">1</span>);
headers.put(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"x-delay"</span>, <span class="hl-number">5000</span> * retriesHeader);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">else</span> {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> DirectExchange delayExchange() {
DirectExchange exchange = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> exchange;
}
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Binding bindOriginalToDelay() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> BindingBuilder.bind(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Queue parkingLot() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Queue(PARKING_LOT);
}
}</pre></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_partitioned_destinations" href="#_partitioned_destinations"></a>40.6.2&nbsp;Partitioned Destinations</h3></div></div></div><p>With partitioned destinations, there is one DLQ for all partitions. We determine the original queue from the headers.</p><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="__literal_republishtodlq_false_literal" href="#__literal_republishtodlq_false_literal"></a><code class="literal">republishToDlq=false</code></h4></div></div></div><p>When <code class="literal">republishToDlq</code> is <code class="literal">false</code>, RabbitMQ publishes the message to the DLX/DLQ with an <code class="literal">x-death</code> header containing information about the original destination, as shown in the following example:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> ReRouteDlqApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String ORIGINAL_QUEUE = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"so8400in.so8400"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String DLQ = ORIGINAL_QUEUE + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">".dlq"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String PARKING_LOT = ORIGINAL_QUEUE + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">".parkingLot"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String X_DEATH_HEADER = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"x-death"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String X_RETRIES_HEADER = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"x-retries"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">throws</span> Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, args);
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hit enter to terminate"</span>);
System.in.read();
context.close();
}
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> RabbitTemplate rabbitTemplate;
<em><span class="hl-annotation" style="color: gray">@SuppressWarnings("unchecked")</span></em>
<em><span class="hl-annotation" style="color: gray">@RabbitListener(queues = DLQ)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> rePublish(Message failedMessage) {
Map&lt;String, Object&gt; headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retriesHeader == null) {
retriesHeader = Integer.valueOf(<span class="hl-number">0</span>);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retriesHeader &lt; <span class="hl-number">3</span>) {
headers.put(X_RETRIES_HEADER, retriesHeader + <span class="hl-number">1</span>);
List&lt;Map&lt;String, ?&gt;&gt; xDeath = (List&lt;Map&lt;String, ?&gt;&gt;) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(<span class="hl-number">0</span>).get(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"exchange"</span>);
List&lt;String&gt; routingKeys = (List&lt;String&gt;) xDeath.get(<span class="hl-number">0</span>).get(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"routing-keys"</span>);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.rabbitTemplate.send(exchange, routingKeys.get(<span class="hl-number">0</span>), failedMessage);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">else</span> {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Queue parkingLot() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Queue(PARKING_LOT);
}
}</pre></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="__literal_republishtodlq_true_literal" href="#__literal_republishtodlq_true_literal"></a><code class="literal">republishToDlq=true</code></h4></div></div></div><p>When <code class="literal">republishToDlq</code> is <code class="literal">true</code>, the republishing recoverer adds the original exchange and routing key to headers, as shown in the following example:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> ReRouteDlqApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String ORIGINAL_QUEUE = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"so8400in.so8400"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String DLQ = ORIGINAL_QUEUE + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">".dlq"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String PARKING_LOT = ORIGINAL_QUEUE + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">".parkingLot"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String X_RETRIES_HEADER = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"x-retries"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">throws</span> Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, args);
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Hit enter to terminate"</span>);
System.in.read();
context.close();
}
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> RabbitTemplate rabbitTemplate;
<em><span class="hl-annotation" style="color: gray">@RabbitListener(queues = DLQ)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> rePublish(Message failedMessage) {
Map&lt;String, Object&gt; headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retriesHeader == null) {
retriesHeader = Integer.valueOf(<span class="hl-number">0</span>);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retriesHeader &lt; <span class="hl-number">3</span>) {
headers.put(X_RETRIES_HEADER, retriesHeader + <span class="hl-number">1</span>);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">else</span> {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Queue parkingLot() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Queue(PARKING_LOT);
}
}</pre></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_partitioning_with_the_rabbitmq_binder" href="#_partitioning_with_the_rabbitmq_binder"></a>40.7&nbsp;Partitioning with the RabbitMQ Binder</h2></div></div></div><p>RabbitMQ does not support partitioning natively.</p><p>Sometimes, it is advantageous to send data to specific partitions&#8201;&#8212;&#8201;for example, when you want to strictly order message processing, all messages for a particular customer should go to the same partition.</p><p>The <code class="literal">RabbitMessageChannelBinder</code> provides partitioning by binding a queue for each partition to the destination exchange.</p><p>The following Java and YAML examples show how to configure the producer:</p><p><b>Producer.&nbsp;</b>
</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> RabbitPartitionProducerApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> Random RANDOM = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Random(System.currentTimeMillis());
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String[] data = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> String[] {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"abc1"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"def1"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"qux1"</span>,
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"abc2"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"def2"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"qux2"</span>,
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"abc3"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"def3"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"qux3"</span>,
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"abc4"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"def4"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"qux4"</span>,
};
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> SpringApplicationBuilder(RabbitPartitionProducerApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>)
.web(false)
.run(args);
}
<em><span class="hl-annotation" style="color: gray">@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Message&lt;?&gt; generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Sending: "</span> + value);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> MessageBuilder.withPayload(value)
.setHeader(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"partitionKey"</span>, value)
.build();
}
}</pre><p>
</p><p><b>application.yml.&nbsp;</b>
</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> spring</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> cloud</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> stream</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> bindings</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> output</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> destination</span>: partitioned.destination
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> producer</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> partitioned</span>: <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">true</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> partition-key-expression</span>: headers[<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">'partitionKey'</span><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">]</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> partition-count</span>: <span class="hl-number">2</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> required-groups</span>:
- myGroup</pre><p>
</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>The configuration in the prececing example uses the default partitioning (<code class="literal">key.hashCode() % partitionCount</code>).
This may or may not provide a suitably balanced algorithm, depending on the key values.
You can override this default by using the <code class="literal">partitionSelectorExpression</code> or <code class="literal">partitionSelectorClass</code> properties.</p><p>The <code class="literal">required-groups</code> property is required only if you need the consumer queues to be provisioned when the producer is deployed.
Otherwise, any messages sent to a partition are lost until the corresponding consumer is deployed.</p></td></tr></table></div><p>The following configuration provisions a topic exchange:</p><div class="informalfigure"><div class="mediaobject"><img src="images/part-exchange.png" alt="part exchange"></div></div><p>The following queues are bound to that exchange:</p><div class="informalfigure"><div class="mediaobject"><img src="images/part-queues.png" alt="part queues"></div></div><p>The following bindings associate the queues to the exchange:</p><div class="informalfigure"><div class="mediaobject"><img src="images/part-bindings.png" alt="part bindings"></div></div><p>The following Java and YAML examples continue the previous examples and show how to configure the consumer:</p><p><b>Consumer.&nbsp;</b>
</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> RabbitPartitionConsumerApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> SpringApplicationBuilder(RabbitPartitionConsumerApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>)
.web(false)
.run(args);
}
<em><span class="hl-annotation" style="color: gray">@StreamListener(Sink.INPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> listen(<em><span class="hl-annotation" style="color: gray">@Payload</span></em> String in, <em><span class="hl-annotation" style="color: gray">@Header(AmqpHeaders.CONSUMER_QUEUE)</span></em> String queue) {
System.out.println(in + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">" received from queue "</span> + queue);
}
}</pre><p>
</p><p><b>application.yml.&nbsp;</b>
</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> spring</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> cloud</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> stream</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> bindings</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> input</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> destination</span>: partitioned.destination
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> group</span>: myGroup
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> consumer</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> partitioned</span>: <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">true</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> instance-index</span>: <span class="hl-number">0</span></pre><p>
</p><div class="important" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Important"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Important]" src="images/important.png"></td><th align="left">Important</th></tr><tr><td align="left" valign="top"><p>The <code class="literal">RabbitMessageChannelBinder</code> does not support dynamic scaling.
There must be at least one consumer per partition.
The consumer&#8217;s <code class="literal">instanceIndex</code> is used to indicate which partition is consumed.
Platforms such as Cloud Foundry can have only one instance with an <code class="literal">instanceIndex</code>.</p></td></tr></table></div></div></div><div class="navfooter"><hr><table width="100%" summary="Navigation footer"><tr><td width="40%" align="left"><a accesskey="p" href="multi__apache_kafka_streams_binder.html">Prev</a>&nbsp;</td><td width="20%" align="center"><a accesskey="u" href="multi__binder_implementations.html">Up</a></td><td width="40%" align="right">&nbsp;<a accesskey="n" href="multi__spring_cloud_bus.html">Next</a></td></tr><tr><td width="40%" align="left" valign="top">39.&nbsp;Apache Kafka Streams Binder&nbsp;</td><td width="20%" align="center"><a accesskey="h" href="multi_spring-cloud.html">Home</a></td><td width="40%" align="right" valign="top">&nbsp;Part&nbsp;VII.&nbsp;Spring Cloud Bus</td></tr></table></div></body></html>