Files
spring-cloud-static/spring-cloud-stream-binder-rabbit/3.1.0.M1/reference/html/spring-cloud-stream-binder-rabbit-aggregate.html
2020-04-07 16:59:20 +00:00

1926 lines
75 KiB
HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<!--[if IE]><meta http-equiv="X-UA-Compatible" content="IE=edge"><![endif]-->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="generator" content="Asciidoctor 1.5.8">
<title>Usage</title>
<link rel="stylesheet" href="css/spring.css">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<style>
.hidden {
display: none;
}
.switch {
border-width: 1px 1px 0 1px;
border-style: solid;
border-color: #7a2518;
display: inline-block;
}
.switch--item {
padding: 10px;
background-color: #ffffff;
color: #7a2518;
display: inline-block;
cursor: pointer;
}
.switch--item:not(:first-child) {
border-width: 0 0 0 1px;
border-style: solid;
border-color: #7a2518;
}
.switch--item.selected {
background-color: #7a2519;
color: #ffffff;
}
</style>
<script src="https://cdnjs.cloudflare.com/ajax/libs/zepto/1.2.0/zepto.min.js"></script>
<script type="text/javascript">
function addBlockSwitches() {
$('.primary').each(function() {
primary = $(this);
createSwitchItem(primary, createBlockSwitch(primary)).item.addClass("selected");
primary.children('.title').remove();
});
$('.secondary').each(function(idx, node) {
secondary = $(node);
primary = findPrimary(secondary);
switchItem = createSwitchItem(secondary, primary.children('.switch'));
switchItem.content.addClass('hidden');
findPrimary(secondary).append(switchItem.content);
secondary.remove();
});
}
function createBlockSwitch(primary) {
blockSwitch = $('<div class="switch"></div>');
primary.prepend(blockSwitch);
return blockSwitch;
}
function findPrimary(secondary) {
candidate = secondary.prev();
while (!candidate.is('.primary')) {
candidate = candidate.prev();
}
return candidate;
}
function createSwitchItem(block, blockSwitch) {
blockName = block.children('.title').text();
content = block.children('.content').first().append(block.next('.colist'));
item = $('<div class="switch--item">' + blockName + '</div>');
item.on('click', '', content, function(e) {
$(this).addClass('selected');
$(this).siblings().removeClass('selected');
e.data.siblings('.content').addClass('hidden');
e.data.removeClass('hidden');
});
blockSwitch.append(item);
return {'item': item, 'content': content};
}
function globalSwitch() {
$('.switch--item').each(function() {
$(this).off('click');
$(this).on('click', function() {
selectedText = $(this).text()
selectedIndex = $(this).index()
$(".switch--item").filter(function() { return ($(this).text() === selectedText) }).each(function() {
$(this).addClass('selected');
$(this).siblings().removeClass('selected');
selectedContent = $(this).parent().siblings(".content").eq(selectedIndex)
selectedContent.removeClass('hidden');
selectedContent.siblings().addClass('hidden');
});
});
});
}
$(addBlockSwitches);
$(globalSwitch);
</script>
</head>
<body class="book toc2 toc-left">
<div id="header">
<div id="toc" class="toc2">
<div id="toctitle">Table of Contents</div>
<ul class="sectlevel2">
<li><a href="#_usage">Usage</a></li>
<li><a href="#_rabbitmq_binder_overview">RabbitMQ Binder Overview</a></li>
<li><a href="#_configuration_options">Configuration Options</a></li>
<li><a href="#_using_existing_queuesexchanges">Using Existing Queues/Exchanges</a></li>
<li><a href="#_retry_with_the_rabbitmq_binder">Retry With the RabbitMQ Binder</a></li>
<li><a href="#rabbit-error-channels">Error Channels</a></li>
<li><a href="#rabbit-dlq-processing">Dead-Letter Queue Processing</a></li>
<li><a href="#_partitioning_with_the_rabbitmq_binder">Partitioning with the RabbitMQ Binder</a></li>
</ul>
</div>
</div>
<div id="content">
<div id="preamble">
<div class="sectionbody">
</div>
</div>
<div class="sect2">
<h3 id="_usage"><a class="link" href="#_usage">Usage</a></h3>
<div class="paragraph">
<p>To use the RabbitMQ binder, you can add it to your Spring Cloud Stream application, by using the following Maven coordinates:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-stream-binder-rabbit&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Alternatively, you can use the Spring Cloud Stream RabbitMQ Starter, as follows:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-starter-stream-rabbit&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_rabbitmq_binder_overview"><a class="link" href="#_rabbitmq_binder_overview">RabbitMQ Binder Overview</a></h3>
<div class="paragraph">
<p>The following simplified diagram shows how the RabbitMQ binder operates:</p>
</div>
<div class="imageblock">
<div class="content">
<img src="https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-binder-rabbit/master/docs/src/main/asciidoc/images/rabbit-binder.png" alt="rabbit binder" width="300">
</div>
<div class="title">Figure 1. RabbitMQ Binder</div>
</div>
<div class="paragraph">
<p>By default, the RabbitMQ Binder implementation maps each destination to a <code>TopicExchange</code>.
For each consumer group, a <code>Queue</code> is bound to that <code>TopicExchange</code>.
Each consumer instance has a corresponding RabbitMQ <code>Consumer</code> instance for its group&#8217;s <code>Queue</code>.
For partitioned producers and consumers, the queues are suffixed with the partition index and use the partition index as the routing key.
For anonymous consumers (those with no <code>group</code> property), an auto-delete queue (with a randomized unique name) is used.</p>
</div>
<div class="paragraph">
<p>By using the optional <code>autoBindDlq</code> option, you can configure the binder to create and configure dead-letter queues (DLQs) (and a dead-letter exchange <code>DLX</code>, as well as routing infrastructure).
By default, the dead letter queue has the name of the destination, appended with <code>.dlq</code>.
If retry is enabled (<code>maxAttempts &gt; 1</code>), failed messages are delivered to the DLQ after retries are exhausted.
If retry is disabled (<code>maxAttempts = 1</code>), you should set <code>requeueRejected</code> to <code>false</code> (the default) so that failed messages are routed to the DLQ, instead of being re-queued.
In addition, <code>republishToDlq</code> causes the binder to publish a failed message to the DLQ (instead of rejecting it).
This feature lets additional information (such as the stack trace in the <code>x-exception-stacktrace</code> header) be added to the message in headers.
See the <a href="#spring-cloud-stream-rabbit-frame-max-headroom"><code>frameMaxHeadroom</code> property</a> for information about truncated stack traces.
This option does not need retry enabled.
You can republish a failed message after just one attempt.
Starting with version 1.2, you can configure the delivery mode of republished messages.
See the <a href="#spring-cloud-stream-rabbit-republish-delivery-mode"><code>republishDeliveryMode</code> property</a>.</p>
</div>
<div class="paragraph">
<p>If the stream listener throws an <code>ImmediateAcknowledgeAmqpException</code>, the DLQ is bypassed and the message simply discarded.
Starting with version 2.1, this is true regardless of the setting of <code>republishToDlq</code>; previously it was only the case when <code>republishToDlq</code> was <code>false</code>.</p>
</div>
<div class="admonitionblock important">
<table>
<tr>
<td class="icon">
<i class="fa icon-important" title="Important"></i>
</td>
<td class="content">
Setting <code>requeueRejected</code> to <code>true</code> (with <code>republishToDlq=false</code> ) causes the message to be re-queued and redelivered continually, which is likely not what you want unless the reason for the failure is transient.
In general, you should enable retry within the binder by setting <code>maxAttempts</code> to greater than one or by setting <code>republishToDlq</code> to <code>true</code>.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>See <a href="#rabbit-binder-properties">RabbitMQ Binder Properties</a> for more information about these properties.</p>
</div>
<div class="paragraph">
<p>The framework does not provide any standard mechanism to consume dead-letter messages (or to re-route them back to the primary queue).
Some options are described in <a href="#rabbit-dlq-processing">Dead-Letter Queue Processing</a>.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
When multiple RabbitMQ binders are used in a Spring Cloud Stream application, it is important to disable 'RabbitAutoConfiguration' to avoid the same configuration from <code>RabbitAutoConfiguration</code> being applied to the two binders.
You can exclude the class by using the <code>@SpringBootApplication</code> annotation.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>Starting with version 2.0, the <code>RabbitMessageChannelBinder</code> sets the <code>RabbitTemplate.userPublisherConnection</code> property to <code>true</code> so that the non-transactional producers avoid deadlocks on consumers, which can happen if cached connections are blocked because of a <a href="https://www.rabbitmq.com/memory.html">memory alarm</a> on the broker.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
Currently, a <code>multiplex</code> consumer (a single consumer listening to multiple queues) is only supported for message-driven conssumers; polled consumers can only retrieve messages from a single queue.
</td>
</tr>
</table>
</div>
</div>
<div class="sect2">
<h3 id="_configuration_options"><a class="link" href="#_configuration_options">Configuration Options</a></h3>
<div class="paragraph">
<p>This section contains settings specific to the RabbitMQ Binder and bound channels.</p>
</div>
<div class="paragraph">
<p>For general binding configuration options and properties, see the <a href="https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_configuration_options">Spring Cloud Stream core documentation</a>.</p>
</div>
<div class="sect3">
<h4 id="rabbit-binder-properties"><a class="link" href="#rabbit-binder-properties">RabbitMQ Binder Properties</a></h4>
<div class="paragraph">
<p>By default, the RabbitMQ binder uses Spring Boot&#8217;s <code>ConnectionFactory</code>.
Conseuqently, it supports all Spring Boot configuration options for RabbitMQ.
(For reference, see the <a href="https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#common-application-properties">Spring Boot documentation</a>).
RabbitMQ configuration options use the <code>spring.rabbitmq</code> prefix.</p>
</div>
<div class="paragraph">
<p>In addition to Spring Boot options, the RabbitMQ binder supports the following properties:</p>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">spring.cloud.stream.rabbit.binder.adminAddresses</dt>
<dd>
<p>A comma-separated list of RabbitMQ management plugin URLs.
Only used when <code>nodes</code> contains more than one entry.
Each entry in this list must have a corresponding entry in <code>spring.rabbitmq.addresses</code>.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue.
See <a href="https://docs.spring.io/spring-amqp/reference/html/_reference.html#queue-affinity">Queue Affinity and the LocalizedQueueConnectionFactory</a> for more information.</p>
<div class="paragraph">
<p>Default: empty.</p>
</div>
</dd>
<dt class="hdlist1">spring.cloud.stream.rabbit.binder.nodes</dt>
<dd>
<p>A comma-separated list of RabbitMQ node names.
When more than one entry, used to locate the server address where a queue is located.
Each entry in this list must have a corresponding entry in <code>spring.rabbitmq.addresses</code>.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue.
See <a href="https://docs.spring.io/spring-amqp/reference/html/_reference.html#queue-affinity">Queue Affinity and the LocalizedQueueConnectionFactory</a> for more information.</p>
<div class="paragraph">
<p>Default: empty.</p>
</div>
</dd>
<dt class="hdlist1">spring.cloud.stream.rabbit.binder.compressionLevel</dt>
<dd>
<p>The compression level for compressed bindings.
See <code>java.util.zip.Deflater</code>.</p>
<div class="paragraph">
<p>Default: <code>1</code> (BEST_LEVEL).</p>
</div>
</dd>
<dt class="hdlist1">spring.cloud.stream.binder.connection-name-prefix</dt>
<dd>
<p>A connection name prefix used to name the connection(s) created by this binder.
The name is this prefix followed by <code>#n</code>, where <code>n</code> increments each time a new connection is opened.</p>
<div class="paragraph">
<p>Default: none (Spring AMQP default).</p>
</div>
</dd>
</dl>
</div>
</div>
<div class="sect3">
<h4 id="_rabbitmq_consumer_properties"><a class="link" href="#_rabbitmq_consumer_properties">RabbitMQ Consumer Properties</a></h4>
<div class="paragraph">
<p>The following properties are available for Rabbit consumers only and must be prefixed with <code>spring.cloud.stream.rabbit.bindings.&lt;channelName&gt;.consumer.</code>.</p>
</div>
<div class="paragraph">
<p>However if the same set of properties needs to be applied to most bindings, to
avoid repetition, Spring Cloud Stream supports setting values for all channels,
in the format of <code>spring.cloud.stream.rabbit.default.&lt;property&gt;=&lt;value&gt;</code>.</p>
</div>
<div class="paragraph">
<p>Also, keep in mind that binding specific property will override its equivalent in the default.</p>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">acknowledgeMode</dt>
<dd>
<p>The acknowledge mode.</p>
<div class="paragraph">
<p>Default: <code>AUTO</code>.</p>
</div>
</dd>
<dt class="hdlist1">anonymousGroupPrefix</dt>
<dd>
<p>When the binding has no <code>group</code> property, an anonymous, auto-delete queue is bound to the destination exchange.
The default naming stragegy for such queues results in a queue named <code>anonymous.&lt;base64 representation of a UUID&gt;</code>.
Set this property to change the prefix to something other than the default.</p>
<div class="paragraph">
<p>Default: <code>anonymous.</code>.</p>
</div>
</dd>
<dt class="hdlist1">autoBindDlq</dt>
<dd>
<p>Whether to automatically declare the DLQ and bind it to the binder DLX.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">bindingRoutingKey</dt>
<dd>
<p>The routing key with which to bind the queue to the exchange (if <code>bindQueue</code> is <code>true</code>).
Can be multiple keys - see <code>bindingRoutingKeyDelimiter</code>.
For partitioned destinations, <code>-&lt;instanceIndex&gt;</code> is appended to each key.</p>
<div class="paragraph">
<p>Default: <code>#</code>.</p>
</div>
</dd>
<dt class="hdlist1">bindingRoutingKeyDelimiter</dt>
<dd>
<p>When this is not null, 'bindingRoutingKey' is considered to be a list of keys delimited by this value; often a comma is used.</p>
<div class="paragraph">
<p>Default: <code>null</code>.</p>
</div>
</dd>
<dt class="hdlist1">bindQueue</dt>
<dd>
<p>Whether to declare the queue and bind it to the destination exchange.
Set it to <code>false</code> if you have set up your own infrastructure and have previously created and bound the queue.</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">consumerTagPrefix</dt>
<dd>
<p>Used to create the consumer tag(s); will be appended by <code>#n</code> where <code>n</code> increments for each consumer created.
Example: <code>${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}</code>.</p>
<div class="paragraph">
<p>Default: none - the broker will generate random consumer tags.</p>
</div>
</dd>
<dt class="hdlist1">containerType</dt>
<dd>
<p>Select the type of listener container to be used.
See <a href="https://docs.spring.io/spring-amqp/reference/html/_reference.html#choose-container">Choosing a Container</a> in the Spring AMQP documentation for more information.</p>
<div class="paragraph">
<p>Default: <code>simple</code></p>
</div>
</dd>
<dt class="hdlist1">deadLetterQueueName</dt>
<dd>
<p>The name of the DLQ</p>
<div class="paragraph">
<p>Default: <code>prefix+destination.dlq</code></p>
</div>
</dd>
<dt class="hdlist1">deadLetterExchange</dt>
<dd>
<p>A DLX to assign to the queue.
Relevant only if <code>autoBindDlq</code> is <code>true</code>.</p>
<div class="paragraph">
<p>Default: 'prefix+DLX'</p>
</div>
</dd>
<dt class="hdlist1">deadLetterExchangeType</dt>
<dd>
<p>The type of the DLX to assign to the queue.
Relevant only if <code>autoBindDlq</code> is <code>true</code>.</p>
<div class="paragraph">
<p>Default: 'direct'</p>
</div>
</dd>
<dt class="hdlist1">deadLetterRoutingKey</dt>
<dd>
<p>A dead letter routing key to assign to the queue.
Relevant only if <code>autoBindDlq</code> is <code>true</code>.</p>
<div class="paragraph">
<p>Default: <code>destination</code></p>
</div>
</dd>
<dt class="hdlist1">declareDlx</dt>
<dd>
<p>Whether to declare the dead letter exchange for the destination.
Relevant only if <code>autoBindDlq</code> is <code>true</code>.
Set to <code>false</code> if you have a pre-configured DLX.</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">declareExchange</dt>
<dd>
<p>Whether to declare the exchange for the destination.</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">delayedExchange</dt>
<dd>
<p>Whether to declare the exchange as a <code>Delayed Message Exchange</code>.
Requires the delayed message exchange plugin on the broker.
The <code>x-delayed-type</code> argument is set to the <code>exchangeType</code>.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">dlqBindingArguments</dt>
<dd>
<p>Arguments applied when binding the dlq to the dead letter exchange; used with <code>headers</code> <code>deadLetterExchangeType</code> to specify headers to match on.
For example <code>&#8230;&#8203;dlqBindingArguments.x-match=any</code>, <code>&#8230;&#8203;dlqBindingArguments.someHeader=someValue</code>.</p>
<div class="paragraph">
<p>Default: empty</p>
</div>
</dd>
<dt class="hdlist1">dlqDeadLetterExchange</dt>
<dd>
<p>If a DLQ is declared, a DLX to assign to that queue.</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">dlqDeadLetterRoutingKey</dt>
<dd>
<p>If a DLQ is declared, a dead letter routing key to assign to that queue.</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">dlqExpires</dt>
<dd>
<p>How long before an unused dead letter queue is deleted (in milliseconds).</p>
<div class="paragraph">
<p>Default: <code>no expiration</code></p>
</div>
</dd>
<dt class="hdlist1">dlqLazy</dt>
<dd>
<p>Declare the dead letter queue with the <code>x-queue-mode=lazy</code> argument.
See <a href="https://www.rabbitmq.com/lazy-queues.html">&#8220;Lazy Queues&#8221;</a>.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">dlqMaxLength</dt>
<dd>
<p>Maximum number of messages in the dead letter queue.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">dlqMaxLengthBytes</dt>
<dd>
<p>Maximum number of total bytes in the dead letter queue from all messages.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">dlqMaxPriority</dt>
<dd>
<p>Maximum priority of messages in the dead letter queue (0-255).</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">dlqOverflowBehavior</dt>
<dd>
<p>Action to take when <code>dlqMaxLength</code> or <code>dlqMaxLengthBytes</code> is exceeded; currently <code>drop-head</code> or <code>reject-publish</code> but refer to the RabbitMQ documentation.</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">dlqQuorum.deliveryLimit</dt>
<dd>
<p>When <code>quorum.enabled=true</code>, set a delivery limit after which the message is dropped or dead-lettered.</p>
<div class="paragraph">
<p>Default: none - broker default will apply.</p>
</div>
</dd>
<dt class="hdlist1">dlqQuorum.enabled</dt>
<dd>
<p>When true, create a quorum dead letter queue instead of a classic queue.</p>
<div class="paragraph">
<p>Default: false</p>
</div>
</dd>
<dt class="hdlist1">dlqQuorum.initialQuorumSize</dt>
<dd>
<p>When <code>quorum.enabled=true</code>, set the initial quorum size.</p>
<div class="paragraph">
<p>Default: none - broker default will apply.</p>
</div>
</dd>
<dt class="hdlist1">dlqSingleActiveConsumer</dt>
<dd>
<p>Set to true to set the <code>x-single-active-consumer</code> queue property to true.</p>
<div class="paragraph">
<p>Default: <code>false</code></p>
</div>
</dd>
<dt class="hdlist1">dlqTtl</dt>
<dd>
<p>Default time to live to apply to the dead letter queue when declared (in milliseconds).</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">durableSubscription</dt>
<dd>
<p>Whether the subscription should be durable.
Only effective if <code>group</code> is also set.</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">exchangeAutoDelete</dt>
<dd>
<p>If <code>declareExchange</code> is true, whether the exchange should be auto-deleted (that is, removed after the last queue is removed).</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">exchangeDurable</dt>
<dd>
<p>If <code>declareExchange</code> is true, whether the exchange should be durable (that is, it survives broker restart).</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">exchangeType</dt>
<dd>
<p>The exchange type: <code>direct</code>, <code>fanout</code>, <code>headers</code> or <code>topic</code> for non-partitioned destinations and <code>direct</code>, headers or <code>topic</code> for partitioned destinations.</p>
<div class="paragraph">
<p>Default: <code>topic</code>.</p>
</div>
</dd>
<dt class="hdlist1">exclusive</dt>
<dd>
<p>Whether to create an exclusive consumer.
Concurrency should be 1 when this is <code>true</code>.
Often used when strict ordering is required but enabling a hot standby instance to take over after a failure.
See <code>recoveryInterval</code>, which controls how often a standby instance attempts to consume.
Consider using <code>singleActiveConsumer</code> instead when using RabbitMQ 3.8 or later.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">expires</dt>
<dd>
<p>How long before an unused queue is deleted (in milliseconds).</p>
<div class="paragraph">
<p>Default: <code>no expiration</code></p>
</div>
</dd>
<dt class="hdlist1">failedDeclarationRetryInterval</dt>
<dd>
<p>The interval (in milliseconds) between attempts to consume from a queue if it is missing.</p>
<div class="paragraph">
<p>Default: 5000</p>
</div>
</dd>
</dl>
</div>
<div id="spring-cloud-stream-rabbit-frame-max-headroom" class="dlist">
<dl>
<dt class="hdlist1">frameMaxHeadroom</dt>
<dd>
<p>The number of bytes to reserve for other headers when adding the stack trace to a DLQ message header.
All headers must fit within the <code>frame_max</code> size configured on the broker.
Stack traces can be large; if the size plus this property exceeds <code>frame_max</code> then the stack trace will be truncated.
A WARN log will be written; consider increasing the <code>frame_max</code> or reducing the stack trace by catching the exception and throwing one with a smaller stack trace.</p>
<div class="paragraph">
<p>Default: 20000</p>
</div>
</dd>
<dt class="hdlist1">headerPatterns</dt>
<dd>
<p>Patterns for headers to be mapped from inbound messages.</p>
<div class="paragraph">
<p>Default: <code>['*']</code> (all headers).</p>
</div>
</dd>
<dt class="hdlist1">lazy</dt>
<dd>
<p>Declare the queue with the <code>x-queue-mode=lazy</code> argument.
See <a href="https://www.rabbitmq.com/lazy-queues.html">&#8220;Lazy Queues&#8221;</a>.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">maxConcurrency</dt>
<dd>
<p>The maximum number of consumers.
Not supported when the <code>containerType</code> is <code>direct</code>.</p>
<div class="paragraph">
<p>Default: <code>1</code>.</p>
</div>
</dd>
<dt class="hdlist1">maxLength</dt>
<dd>
<p>The maximum number of messages in the queue.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">maxLengthBytes</dt>
<dd>
<p>The maximum number of total bytes in the queue from all messages.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">maxPriority</dt>
<dd>
<p>The maximum priority of messages in the queue (0-255).</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">missingQueuesFatal</dt>
<dd>
<p>When the queue cannot be found, whether to treat the condition as fatal and stop the listener container.
Defaults to <code>false</code> so that the container keeps trying to consume from the queue&#8201;&#8212;&#8201;for example, when using a cluster and the node hosting a non-HA queue is down.</p>
<div class="paragraph">
<p>Default: <code>false</code></p>
</div>
</dd>
<dt class="hdlist1">overflowBehavior</dt>
<dd>
<p>Action to take when <code>maxLength</code> or <code>maxLengthBytes</code> is exceeded; currently <code>drop-head</code> or <code>reject-publish</code> but refer to the RabbitMQ documentation.</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">prefetch</dt>
<dd>
<p>Prefetch count.</p>
<div class="paragraph">
<p>Default: <code>1</code>.</p>
</div>
</dd>
<dt class="hdlist1">prefix</dt>
<dd>
<p>A prefix to be added to the name of the <code>destination</code> and queues.</p>
<div class="paragraph">
<p>Default: "".</p>
</div>
</dd>
<dt class="hdlist1">queueBindingArguments</dt>
<dd>
<p>Arguments applied when binding the queue to the exchange; used with <code>headers</code> <code>exchangeType</code> to specify headers to match on.
For example <code>&#8230;&#8203;queueBindingArguments.x-match=any</code>, <code>&#8230;&#8203;queueBindingArguments.someHeader=someValue</code>.</p>
<div class="paragraph">
<p>Default: empty</p>
</div>
</dd>
<dt class="hdlist1">queueDeclarationRetries</dt>
<dd>
<p>The number of times to retry consuming from a queue if it is missing.
Relevant only when <code>missingQueuesFatal</code> is <code>true</code>.
Otherwise, the container keeps retrying indefinitely.
Not supported when the <code>containerType</code> is <code>direct</code>.</p>
<div class="paragraph">
<p>Default: <code>3</code></p>
</div>
</dd>
<dt class="hdlist1">queueNameGroupOnly</dt>
<dd>
<p>When true, consume from a queue with a name equal to the <code>group</code>.
Otherwise the queue name is <code>destination.group</code>.
This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.</p>
<div class="paragraph">
<p>Default: false.</p>
</div>
</dd>
<dt class="hdlist1">quorum.deliveryLimit</dt>
<dd>
<p>When <code>quorum.enabled=true</code>, set a delivery limit after which the message is dropped or dead-lettered.</p>
<div class="paragraph">
<p>Default: none - broker default will apply.</p>
</div>
</dd>
<dt class="hdlist1">quorum.enabled</dt>
<dd>
<p>When true, create a quorum queue instead of a classic queue.</p>
<div class="paragraph">
<p>Default: false</p>
</div>
</dd>
<dt class="hdlist1">quorum.initialQuorumSize</dt>
<dd>
<p>When <code>quorum.enabled=true</code>, set the initial quorum size.</p>
<div class="paragraph">
<p>Default: none - broker default will apply.</p>
</div>
</dd>
<dt class="hdlist1">recoveryInterval</dt>
<dd>
<p>The interval between connection recovery attempts, in milliseconds.</p>
<div class="paragraph">
<p>Default: <code>5000</code>.</p>
</div>
</dd>
<dt class="hdlist1">requeueRejected</dt>
<dd>
<p>Whether delivery failures should be re-queued when retry is disabled or <code>republishToDlq</code> is <code>false</code>.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
</dl>
</div>
<div id="spring-cloud-stream-rabbit-republish-delivery-mode" class="dlist">
<dl>
<dt class="hdlist1">republishDeliveryMode</dt>
<dd>
<p>When <code>republishToDlq</code> is <code>true</code>, specifies the delivery mode of the republished message.</p>
<div class="paragraph">
<p>Default: <code>DeliveryMode.PERSISTENT</code></p>
</div>
</dd>
<dt class="hdlist1">republishToDlq</dt>
<dd>
<p>By default, messages that fail after retries are exhausted are rejected.
If a dead-letter queue (DLQ) is configured, RabbitMQ routes the failed message (unchanged) to the DLQ.
If set to <code>true</code>, the binder republishs failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure.
Also see the <a href="#spring-cloud-stream-rabbit-frame-max-headroom">frameMaxHeadroom property</a>.</p>
<div class="paragraph">
<p>Default: false</p>
</div>
</dd>
<dt class="hdlist1">singleActiveConsumer</dt>
<dd>
<p>Set to true to set the <code>x-single-active-consumer</code> queue property to true.</p>
<div class="paragraph">
<p>Default: <code>false</code></p>
</div>
</dd>
<dt class="hdlist1">transacted</dt>
<dd>
<p>Whether to use transacted channels.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">ttl</dt>
<dd>
<p>Default time to live to apply to the queue when declared (in milliseconds).</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">txSize</dt>
<dd>
<p>The number of deliveries between acks.
Not supported when the <code>containerType</code> is <code>direct</code>.</p>
<div class="paragraph">
<p>Default: <code>1</code>.</p>
</div>
</dd>
</dl>
</div>
</div>
<div class="sect3">
<h4 id="_advanced_listener_container_configuration"><a class="link" href="#_advanced_listener_container_configuration">Advanced Listener Container Configuration</a></h4>
<div class="paragraph">
<p>To set listener container properties that are not exposed as binder or binding properties, add a single bean of type <code>ListenerContainerCustomizer</code> to the application context.
The binder and binding properties will be set and then the customizer will be called.
The customizer (<code>configure()</code> method) is provided with the queue name as well as the consumer group as arguments.</p>
</div>
</div>
<div class="sect3">
<h4 id="_advanced_queueexchangebinding_configuration"><a class="link" href="#_advanced_queueexchangebinding_configuration">Advanced Queue/Exchange/Binding Configuration</a></h4>
<div class="paragraph">
<p>From time to time, the RabbitMQ team add new features that are enabled by setting some argument when declaring, for example, a queue.
Generally, such features are enabled in the binder by adding appropriate properties, but this may not be immediately available in a current version.
Starting with version 3.0.1, you can now add <code>DeclarableCustomizer</code> bean(s) to the application context to modify a <code>Declarable</code> (<code>Queue</code>, <code>Exchange</code> or <code>Binding</code>) just before the declaration is performed.
This allows you to add arguments that are not currently directly supported by the binder.</p>
</div>
</div>
<div class="sect3">
<h4 id="rabbit-receiving-batch"><a class="link" href="#rabbit-receiving-batch">Receiving Batched Messages</a></h4>
<div class="paragraph">
<p>Normally, if a producer binding has <code>batch-enabled=true</code> (see <a href="#rabbit-prod-props">Rabbit Producer Properties</a>), or a message is created by a <code>BatchingRabbitTemplate</code>, elements of the batch are returned as individual calls to the listener method.
Starting with version 3.0, any such batch can be presented as a <code>List&lt;?&gt;</code> to the listener method if <code>spring.cloud.stream.bindings.&lt;name&gt;.consumer.batch-mode</code> is set to <code>true</code>.</p>
</div>
</div>
<div class="sect3">
<h4 id="rabbit-prod-props"><a class="link" href="#rabbit-prod-props">Rabbit Producer Properties</a></h4>
<div class="paragraph">
<p>The following properties are available for Rabbit producers only and must be prefixed with <code>spring.cloud.stream.rabbit.bindings.&lt;channelName&gt;.producer.</code>.</p>
</div>
<div class="paragraph">
<p>However if the same set of properties needs to be applied to most bindings, to
avoid repetition, Spring Cloud Stream supports setting values for all channels,
in the format of <code>spring.cloud.stream.rabbit.default.&lt;property&gt;=&lt;value&gt;</code>.</p>
</div>
<div class="paragraph">
<p>Also, keep in mind that binding specific property will override its equivalent in the default.</p>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">autoBindDlq</dt>
<dd>
<p>Whether to automatically declare the DLQ and bind it to the binder DLX.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">batchingEnabled</dt>
<dd>
<p>Whether to enable message batching by producers.
Messages are batched into one message according to the following properties (described in the next three entries in this list): 'batchSize', <code>batchBufferLimit</code>, and <code>batchTimeout</code>.
See <a href="https://docs.spring.io/spring-amqp//reference/html/_reference.html#template-batching">Batching</a> for more information.
Also see <a href="#rabbit-receiving-batch">Receiving Batched Messages</a>.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">batchSize</dt>
<dd>
<p>The number of messages to buffer when batching is enabled.</p>
<div class="paragraph">
<p>Default: <code>100</code>.</p>
</div>
</dd>
<dt class="hdlist1">batchBufferLimit</dt>
<dd>
<p>The maximum buffer size when batching is enabled.</p>
<div class="paragraph">
<p>Default: <code>10000</code>.</p>
</div>
</dd>
<dt class="hdlist1">batchTimeout</dt>
<dd>
<p>The batch timeout when batching is enabled.</p>
<div class="paragraph">
<p>Default: <code>5000</code>.</p>
</div>
</dd>
<dt class="hdlist1">bindingRoutingKey</dt>
<dd>
<p>The routing key with which to bind the queue to the exchange (if <code>bindQueue</code> is <code>true</code>).
Can be multiple keys - see <code>bindingRoutingKeyDelimiter</code>.
For partitioned destinations, <code>-n</code> is appended to each key.
Only applies if <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>#</code>.</p>
</div>
</dd>
<dt class="hdlist1">bindingRoutingKeyDelimiter</dt>
<dd>
<p>When this is not null, 'bindingRoutingKey' is considered to be a list of keys delimited by this value; often a comma is used.
Only applies if <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>null</code>.</p>
</div>
</dd>
<dt class="hdlist1">bindQueue</dt>
<dd>
<p>Whether to declare the queue and bind it to the destination exchange.
Set it to <code>false</code> if you have set up your own infrastructure and have previously created and bound the queue.
Only applies if <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">compress</dt>
<dd>
<p>Whether data should be compressed when sent.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">confirmAckChannel</dt>
<dd>
<p>When <code>errorChannelEnabled</code> is true, a channel to which to send positive delivery acknowledgments (aka publisher confirms).
If the channel does not exist, a <code>DirectChannel</code> is registered with this name.
The connection factory must be configured to enable publisher confirms.</p>
<div class="paragraph">
<p>Default: <code>nullChannel</code> (acks are discarded).</p>
</div>
</dd>
<dt class="hdlist1">deadLetterQueueName</dt>
<dd>
<p>The name of the DLQ
Only applies if <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>prefix+destination.dlq</code></p>
</div>
</dd>
<dt class="hdlist1">deadLetterExchange</dt>
<dd>
<p>A DLX to assign to the queue.
Relevant only when <code>autoBindDlq</code> is <code>true</code>.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: 'prefix+DLX'</p>
</div>
</dd>
<dt class="hdlist1">deadLetterExchangeType</dt>
<dd>
<p>The type of the DLX to assign to the queue.
Relevant only if <code>autoBindDlq</code> is <code>true</code>.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: 'direct'</p>
</div>
</dd>
<dt class="hdlist1">deadLetterRoutingKey</dt>
<dd>
<p>A dead letter routing key to assign to the queue.
Relevant only when <code>autoBindDlq</code> is <code>true</code>.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>destination</code></p>
</div>
</dd>
<dt class="hdlist1">declareDlx</dt>
<dd>
<p>Whether to declare the dead letter exchange for the destination.
Relevant only if <code>autoBindDlq</code> is <code>true</code>.
Set to <code>false</code> if you have a pre-configured DLX.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">declareExchange</dt>
<dd>
<p>Whether to declare the exchange for the destination.</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">delayExpression</dt>
<dd>
<p>A SpEL expression to evaluate the delay to apply to the message (<code>x-delay</code> header).
It has no effect if the exchange is not a delayed message exchange.</p>
<div class="paragraph">
<p>Default: No <code>x-delay</code> header is set.</p>
</div>
</dd>
<dt class="hdlist1">delayedExchange</dt>
<dd>
<p>Whether to declare the exchange as a <code>Delayed Message Exchange</code>.
Requires the delayed message exchange plugin on the broker.
The <code>x-delayed-type</code> argument is set to the <code>exchangeType</code>.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">deliveryMode</dt>
<dd>
<p>The delivery mode.</p>
<div class="paragraph">
<p>Default: <code>PERSISTENT</code>.</p>
</div>
</dd>
<dt class="hdlist1">dlqBindingArguments</dt>
<dd>
<p>Arguments applied when binding the dlq to the dead letter exchange; used with <code>headers</code> <code>deadLetterExchangeType</code> to specify headers to match on.
For example <code>&#8230;&#8203;dlqBindingArguments.x-match=any</code>, <code>&#8230;&#8203;dlqBindingArguments.someHeader=someValue</code>.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: empty</p>
</div>
</dd>
<dt class="hdlist1">dlqDeadLetterExchange</dt>
<dd>
<p>When a DLQ is declared, a DLX to assign to that queue.
Applies only if <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">dlqDeadLetterRoutingKey</dt>
<dd>
<p>When a DLQ is declared, a dead letter routing key to assign to that queue.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">dlqExpires</dt>
<dd>
<p>How long (in milliseconds) before an unused dead letter queue is deleted.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>no expiration</code></p>
</div>
</dd>
<dt class="hdlist1">dlqLazy</dt>
<dd>
<p>Declare the dead letter queue with the <code>x-queue-mode=lazy</code> argument.
See <a href="https://www.rabbitmq.com/lazy-queues.html">&#8220;Lazy Queues&#8221;</a>.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
</dd>
<dt class="hdlist1">dlqMaxLength</dt>
<dd>
<p>Maximum number of messages in the dead letter queue.
Applies only if <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">dlqMaxLengthBytes</dt>
<dd>
<p>Maximum number of total bytes in the dead letter queue from all messages.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">dlqMaxPriority</dt>
<dd>
<p>Maximum priority of messages in the dead letter queue (0-255)
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">dlqQuorum.deliveryLimit</dt>
<dd>
<p>When <code>quorum.enabled=true</code>, set a delivery limit after which the message is dropped or dead-lettered.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: none - broker default will apply.</p>
</div>
</dd>
<dt class="hdlist1">dlqQuorum.enabled</dt>
<dd>
<p>When true, create a quorum dead letter queue instead of a classic queue.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: false</p>
</div>
</dd>
<dt class="hdlist1">dlqQuorum.initialQuorumSize</dt>
<dd>
<p>When <code>quorum.enabled=true</code>, set the initial quorum size.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: none - broker default will apply.</p>
</div>
</dd>
<dt class="hdlist1">dlqSingleActiveConsumer</dt>
<dd>
<p>Set to true to set the <code>x-single-active-consumer</code> queue property to true.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>false</code></p>
</div>
</dd>
<dt class="hdlist1">dlqTtl</dt>
<dd>
<p>Default time (in milliseconds) to live to apply to the dead letter queue when declared.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">exchangeAutoDelete</dt>
<dd>
<p>If <code>declareExchange</code> is <code>true</code>, whether the exchange should be auto-delete (it is removed after the last queue is removed).</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">exchangeDurable</dt>
<dd>
<p>If <code>declareExchange</code> is <code>true</code>, whether the exchange should be durable (survives broker restart).</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">exchangeType</dt>
<dd>
<p>The exchange type: <code>direct</code>, <code>fanout</code>, <code>headers</code> or <code>topic</code> for non-partitioned destinations and <code>direct</code>, <code>headers</code> or <code>topic</code> for partitioned destinations.</p>
<div class="paragraph">
<p>Default: <code>topic</code>.</p>
</div>
</dd>
<dt class="hdlist1">expires</dt>
<dd>
<p>How long (in milliseconds) before an unused queue is deleted.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>no expiration</code></p>
</div>
</dd>
<dt class="hdlist1">headerPatterns</dt>
<dd>
<p>Patterns for headers to be mapped to outbound messages.</p>
<div class="paragraph">
<p>Default: <code>['*']</code> (all headers).</p>
</div>
</dd>
<dt class="hdlist1">lazy</dt>
<dd>
<p>Declare the queue with the <code>x-queue-mode=lazy</code> argument.
See <a href="https://www.rabbitmq.com/lazy-queues.html">&#8220;Lazy Queues&#8221;</a>.
Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">maxLength</dt>
<dd>
<p>Maximum number of messages in the queue.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">maxLengthBytes</dt>
<dd>
<p>Maximum number of total bytes in the queue from all messages.
Only applies if <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
<dt class="hdlist1">maxPriority</dt>
<dd>
<p>Maximum priority of messages in the queue (0-255).
Only applies if <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>none</code></p>
</div>
</dd>
<dt class="hdlist1">prefix</dt>
<dd>
<p>A prefix to be added to the name of the <code>destination</code> exchange.</p>
<div class="paragraph">
<p>Default: "".</p>
</div>
</dd>
<dt class="hdlist1">queueBindingArguments</dt>
<dd>
<p>Arguments applied when binding the queue to the exchange; used with <code>headers</code> <code>exchangeType</code> to specify headers to match on.
For example <code>&#8230;&#8203;queueBindingArguments.x-match=any</code>, <code>&#8230;&#8203;queueBindingArguments.someHeader=someValue</code>.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: empty</p>
</div>
</dd>
<dt class="hdlist1">queueNameGroupOnly</dt>
<dd>
<p>When <code>true</code>, consume from a queue with a name equal to the <code>group</code>.
Otherwise the queue name is <code>destination.group</code>.
This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: false.</p>
</div>
</dd>
<dt class="hdlist1">quorum.deliveryLimit</dt>
<dd>
<p>When <code>quorum.enabled=true</code>, set a delivery limit after which the message is dropped or dead-lettered.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: none - broker default will apply.</p>
</div>
</dd>
<dt class="hdlist1">quorum.enabled</dt>
<dd>
<p>When true, create a quorum queue instead of a classic queue.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: false</p>
</div>
</dd>
<dt class="hdlist1">quorum.initialQuorumSize</dt>
<dd>
<p>When <code>quorum.enabled=true</code>, set the initial quorum size.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: none - broker default will apply.</p>
</div>
</dd>
<dt class="hdlist1">routingKeyExpression</dt>
<dd>
<p>A SpEL expression to determine the routing key to use when publishing messages.
For a fixed routing key, use a literal expression, such as <code>routingKeyExpression='my.routingKey'</code> in a properties file or <code>routingKeyExpression: '''my.routingKey'''</code> in a YAML file.</p>
<div class="paragraph">
<p>Default: <code>destination</code> or <code>destination-&lt;partition&gt;</code> for partitioned destinations.</p>
</div>
</dd>
<dt class="hdlist1">singleActiveConsumer</dt>
<dd>
<p>Set to true to set the <code>x-single-active-consumer</code> queue property to true.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>false</code></p>
</div>
</dd>
<dt class="hdlist1">transacted</dt>
<dd>
<p>Whether to use transacted channels.</p>
<div class="paragraph">
<p>Default: <code>false</code>.</p>
</div>
</dd>
<dt class="hdlist1">ttl</dt>
<dd>
<p>Default time (in milliseconds) to live to apply to the queue when declared.
Applies only when <code>requiredGroups</code> are provided and then only to those groups.</p>
<div class="paragraph">
<p>Default: <code>no limit</code></p>
</div>
</dd>
</dl>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
In the case of RabbitMQ, content type headers can be set by external applications.
Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport&#8201;&#8212;&#8201;including transports, such as Kafka (prior to 0.11), that do not natively support headers.
</td>
</tr>
</table>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_using_existing_queuesexchanges"><a class="link" href="#_using_existing_queuesexchanges">Using Existing Queues/Exchanges</a></h3>
<div class="paragraph">
<p>By default, the binder will automatically provision a topic exchange with the name being derived from the value of the destination binding property <code>&lt;prefix&gt;&lt;destination&gt;</code>.
The destination defaults to the binding name, if not provided.
When binding a consumer, a queue will automatically be provisioned with the name <code>&lt;prefix&gt;&lt;destination&gt;.&lt;group&gt;</code> (if a <code>group</code> binding property is specified), or an anonymous, auto-delete queue when there is no <code>group</code>.
The queue will be bound to the exchange with the "match-all" wildcard routing key (<code>#</code>) for a non-partitioned binding or <code>&lt;destination&gt;-&lt;instanceIndex&gt;</code> for a partitioned binding.
The prefix is an empty <code>String</code> by default.
If an output binding is specified with <code>requiredGroups</code>, a queue/binding will be provisioned for each group.</p>
</div>
<div class="paragraph">
<p>There are a number of rabbit-specific binding properties that allow you to modify this default behavior.</p>
</div>
<div class="paragraph">
<p>If you have an existing exchange/queue that you wish to use, you can completely disable automatic provisioning as follows, assuming the exchange is named <code>myExchange</code> and the queue is named <code>myQueue</code>:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>spring.cloud.stream.bindings.&lt;binding name&gt;.destination=myExhange</code></p>
</li>
<li>
<p><code>spring.cloud.stream.bindings.&lt;binding name&gt;.group=myQueue</code></p>
</li>
<li>
<p><code>spring.cloud.stream.rabbit.bindings.&lt;binding name&gt;.consumer.bindQueue=false</code></p>
</li>
<li>
<p><code>spring.cloud.stream.rabbit.bindings.&lt;binding name&gt;.consumer.declareExchange=false</code></p>
</li>
<li>
<p><code>spring.cloud.stream.rabbit.bindings.&lt;binding name&gt;.consumer.queueNameGroupOnly=true</code></p>
</li>
</ul>
</div>
<div class="paragraph">
<p>If you want the binder to provision the queue/exchange, but you want to do it using something other than the defaults discussed here, use the following properties.
Refer to the property documentation above for more information.</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>spring.cloud.stream.rabbit.bindings.&lt;binding name&gt;.consumer.bindingRoutingKey=myRoutingKey</code></p>
</li>
<li>
<p><code>spring.cloud.stream.rabbit.bindings.&lt;binding name&gt;.consumer.exchangeType=&lt;type&gt;</code></p>
</li>
<li>
<p><code>spring.cloud.stream.rabbit.bindings.&lt;binding name&gt;.producer.routingKeyExpression='myRoutingKey'</code></p>
</li>
</ul>
</div>
<div class="paragraph">
<p>There are similar properties used when declaring a dead-letter exchange/queue, when <code>autoBindDlq</code> is <code>true</code>.</p>
</div>
</div>
<div class="sect2">
<h3 id="_retry_with_the_rabbitmq_binder"><a class="link" href="#_retry_with_the_rabbitmq_binder">Retry With the RabbitMQ Binder</a></h3>
<div class="paragraph">
<p>When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured.
This might be important when strict ordering is required with a single consumer. However, for other use cases, it prevents other messages from being processed on that thread.
An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ) as well as dead-letter configuration on the DLQ itself.
See &#8220;<a href="#rabbit-binder-properties">RabbitMQ Binder Properties</a>&#8221; for more information about the properties discussed here.
You can use the following example configuration to enable this feature:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Set <code>autoBindDlq</code> to <code>true</code>.
The binder create a DLQ.
Optionally, you can specify a name in <code>deadLetterQueueName</code>.</p>
</li>
<li>
<p>Set <code>dlqTtl</code> to the back off time you want to wait between redeliveries.</p>
</li>
<li>
<p>Set the <code>dlqDeadLetterExchange</code> to the default exchange.
Expired messages from the DLQ are routed to the original queue, because the default <code>deadLetterRoutingKey</code> is the queue name (<code>destination.group</code>).
Setting to the default exchange is achieved by setting the property with no value, as shown in the next example.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>To force a message to be dead-lettered, either throw an <code>AmqpRejectAndDontRequeueException</code> or set <code>requeueRejected</code> to <code>true</code> (the default) and throw any exception.</p>
</div>
<div class="paragraph">
<p>The loop continue without end, which is fine for transient problems, but you may want to give up after some number of attempts.
Fortunately, RabbitMQ provides the <code>x-death</code> header, which lets you determine how many cycles have occurred.</p>
</div>
<div class="paragraph">
<p>To acknowledge a message after giving up, throw an <code>ImmediateAcknowledgeAmqpException</code>.</p>
</div>
<div class="sect3">
<h4 id="_putting_it_all_together"><a class="link" href="#_putting_it_all_together">Putting it All Together</a></h4>
<div class="paragraph">
<p>The following configuration creates an exchange <code>myDestination</code> with queue <code>myDestination.consumerGroup</code> bound to a topic exchange with a wildcard routing key <code>#</code>:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---</code></pre>
</div>
</div>
<div class="paragraph">
<p>This configuration creates a DLQ bound to a direct exchange (<code>DLX</code>) with a routing key of <code>myDestination.consumerGroup</code>.
When messages are rejected, they are routed to the DLQ.
After 5 seconds, the message expires and is routed to the original queue by using the queue name as the routing key, as shown in the following example:</p>
</div>
<div class="listingblock">
<div class="title">Spring Boot application</div>
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) Map&lt;?,?&gt; death) {
if (death != null &amp;&amp; death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
}
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Notice that the count property in the <code>x-death</code> header is a <code>Long</code>.</p>
</div>
</div>
</div>
<div class="sect2">
<h3 id="rabbit-error-channels"><a class="link" href="#rabbit-error-channels">Error Channels</a></h3>
<div class="paragraph">
<p>Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
See &#8220;<a href="#spring-cloud-stream-overview-error-handling">[spring-cloud-stream-overview-error-handling]</a>&#8221; for more information.</p>
</div>
<div class="paragraph">
<p>RabbitMQ has two types of send failures:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Returned messages,</p>
</li>
<li>
<p>Negatively acknowledged <a href="https://www.rabbitmq.com/confirms.html">Publisher Confirms</a>.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>The latter is rare.
According to the RabbitMQ documentation "[A nack] will only be delivered if an internal error occurs in the Erlang process responsible for a queue.".</p>
</div>
<div class="paragraph">
<p>As well as enabling producer error channels (as described in &#8220;<a href="#spring-cloud-stream-overview-error-handling">[spring-cloud-stream-overview-error-handling]</a>&#8221;), the RabbitMQ binder only sends messages to the channels if the connection factory is appropriately configured, as follows:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>ccf.setPublisherConfirms(true);</code></p>
</li>
<li>
<p><code>ccf.setPublisherReturns(true);</code></p>
</li>
</ul>
</div>
<div class="paragraph">
<p>When using Spring Boot configuration for the connection factory, set the following properties:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>spring.rabbitmq.publisher-confirms</code></p>
</li>
<li>
<p><code>spring.rabbitmq.publisher-returns</code></p>
</li>
</ul>
</div>
<div class="paragraph">
<p>The payload of the <code>ErrorMessage</code> for a returned message is a <code>ReturnedAmqpMessageException</code> with the following properties:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>failedMessage</code>: The spring-messaging <code>Message&lt;?&gt;</code> that failed to be sent.</p>
</li>
<li>
<p><code>amqpMessage</code>: The raw spring-amqp <code>Message</code>.</p>
</li>
<li>
<p><code>replyCode</code>: An integer value indicating the reason for the failure (for example, 312 - No route).</p>
</li>
<li>
<p><code>replyText</code>: A text value indicating the reason for the failure (for example, <code>NO_ROUTE</code>).</p>
</li>
<li>
<p><code>exchange</code>: The exchange to which the message was published.</p>
</li>
<li>
<p><code>routingKey</code>: The routing key used when the message was published.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>For negatively acknowledged confirmations, the payload is a <code>NackedAmqpMessageException</code> with the following properties:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>failedMessage</code>: The spring-messaging <code>Message&lt;?&gt;</code> that failed to be sent.</p>
</li>
<li>
<p><code>nackReason</code>: A reason (if available&#8201;&#8212;&#8201;you may need to examine the broker logs for more information).</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>There is no automatic handling of these exceptions (such as sending to a <a href="#rabbit-dlq-processing">dead-letter queue</a>).
You can consume these exceptions with your own Spring Integration flow.</p>
</div>
</div>
<div class="sect2">
<h3 id="rabbit-dlq-processing"><a class="link" href="#rabbit-dlq-processing">Dead-Letter Queue Processing</a></h3>
<div class="paragraph">
<p>Because you cannot anticipate how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue.
However, if the problem is a permanent issue, that could cause an infinite loop.
The following Spring Boot application shows an example of how to route those messages back to the original queue but moves them to a third &#8220;parking lot&#8221; queue after three attempts.
The second example uses the <a href="https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/">RabbitMQ Delayed Message Exchange</a> to introduce a delay to the re-queued message.
In this example, the delay increases for each attempt.
These examples use a <code>@RabbitListener</code> to receive messages from the DLQ.
You could also use <code>RabbitTemplate.receive()</code> in a batch process.</p>
</div>
<div class="paragraph">
<p>The examples assume the original destination is <code>so8400in</code> and the consumer group is <code>so8400</code>.</p>
</div>
<div class="sect3">
<h4 id="_non_partitioned_destinations"><a class="link" href="#_non_partitioned_destinations">Non-Partitioned Destinations</a></h4>
<div class="paragraph">
<p>The first two examples are for when the destination is <strong>not</strong> partitioned:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader &lt; 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}</code></pre>
</div>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map&lt;String, Object&gt; headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader &lt; 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}</code></pre>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_partitioned_destinations"><a class="link" href="#_partitioned_destinations">Partitioned Destinations</a></h4>
<div class="paragraph">
<p>With partitioned destinations, there is one DLQ for all partitions. We determine the original queue from the headers.</p>
</div>
<div class="sect4">
<h5 id="_republishtodlqfalse"><a class="link" href="#_republishtodlqfalse"><code>republishToDlq=false</code></a></h5>
<div class="paragraph">
<p>When <code>republishToDlq</code> is <code>false</code>, RabbitMQ publishes the message to the DLX/DLQ with an <code>x-death</code> header containing information about the original destination, as shown in the following example:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@SuppressWarnings("unchecked")
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map&lt;String, Object&gt; headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader &lt; 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List&lt;Map&lt;String, ?&gt;&gt; xDeath = (List&lt;Map&lt;String, ?&gt;&gt;) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List&lt;String&gt; routingKeys = (List&lt;String&gt;) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}</code></pre>
</div>
</div>
</div>
<div class="sect4">
<h5 id="_republishtodlqtrue"><a class="link" href="#_republishtodlqtrue"><code>republishToDlq=true</code></a></h5>
<div class="paragraph">
<p>When <code>republishToDlq</code> is <code>true</code>, the republishing recoverer adds the original exchange and routing key to headers, as shown in the following example:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map&lt;String, Object&gt; headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader &lt; 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_partitioning_with_the_rabbitmq_binder"><a class="link" href="#_partitioning_with_the_rabbitmq_binder">Partitioning with the RabbitMQ Binder</a></h3>
<div class="paragraph">
<p>RabbitMQ does not support partitioning natively.</p>
</div>
<div class="paragraph">
<p>Sometimes, it is advantageous to send data to specific partitions&#8201;&#8212;&#8201;for example, when you want to strictly order message processing, all messages for a particular customer should go to the same partition.</p>
</div>
<div class="paragraph">
<p>The <code>RabbitMessageChannelBinder</code> provides partitioning by binding a queue for each partition to the destination exchange.</p>
</div>
<div class="paragraph">
<p>The following Java and YAML examples show how to configure the producer:</p>
</div>
<div class="listingblock">
<div class="title">Producer</div>
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message&lt;?&gt; generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}</code></pre>
</div>
</div>
<div class="listingblock">
<div class="title">application.yml</div>
<div class="content">
<pre class="highlightjs highlight"><code class="language-yaml hljs" data-lang="yaml"> spring:
cloud:
stream:
bindings:
output:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup</code></pre>
</div>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
<div class="paragraph">
<p>The configuration in the prececing example uses the default partitioning (<code>key.hashCode() % partitionCount</code>).
This may or may not provide a suitably balanced algorithm, depending on the key values.
You can override this default by using the <code>partitionSelectorExpression</code> or <code>partitionSelectorClass</code> properties.</p>
</div>
<div class="paragraph">
<p>The <code>required-groups</code> property is required only if you need the consumer queues to be provisioned when the producer is deployed.
Otherwise, any messages sent to a partition are lost until the corresponding consumer is deployed.</p>
</div>
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>The following configuration provisions a topic exchange:</p>
</div>
<div class="imageblock">
<div class="content">
<img src="./images/part-exchange.png" alt="part exchange">
</div>
</div>
<div class="paragraph">
<p>The following queues are bound to that exchange:</p>
</div>
<div class="imageblock">
<div class="content">
<img src="./images/part-queues.png" alt="part queues">
</div>
</div>
<div class="paragraph">
<p>The following bindings associate the queues to the exchange:</p>
</div>
<div class="imageblock">
<div class="content">
<img src="./images/part-bindings.png" alt="part bindings">
</div>
</div>
<div class="paragraph">
<p>The following Java and YAML examples continue the previous examples and show how to configure the consumer:</p>
</div>
<div class="listingblock">
<div class="title">Consumer</div>
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println(in + " received from queue " + queue);
}
}</code></pre>
</div>
</div>
<div class="listingblock">
<div class="title">application.yml</div>
<div class="content">
<pre class="highlightjs highlight"><code class="language-yaml hljs" data-lang="yaml"> spring:
cloud:
stream:
bindings:
input:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0</code></pre>
</div>
</div>
<div class="admonitionblock important">
<table>
<tr>
<td class="icon">
<i class="fa icon-important" title="Important"></i>
</td>
<td class="content">
The <code>RabbitMessageChannelBinder</code> does not support dynamic scaling.
There must be at least one consumer per partition.
The consumer&#8217;s <code>instanceIndex</code> is used to indicate which partition is consumed.
Platforms such as Cloud Foundry can have only one instance with an <code>instanceIndex</code>.
</td>
</tr>
</table>
</div>
</div>
</div>
<script type="text/javascript" src="js/tocbot/tocbot.min.js"></script>
<script type="text/javascript" src="js/toc.js"></script>
<link rel="stylesheet" href="js/highlight/styles/github.min.css">
<script src="js/highlight/highlight.min.js"></script>
<script>hljs.initHighlighting()</script>
</body>
</html>