Files
spring-cloud-static/Greenwich.RC2/multi/multi__apache_kafka_binder.html
2018-12-21 09:47:55 -05:00

311 lines
59 KiB
HTML

<html><head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>39.&nbsp;Apache Kafka Binder</title><link rel="stylesheet" type="text/css" href="css/manual-multipage.css"><meta name="generator" content="DocBook XSL Stylesheets V1.78.1"><link rel="home" href="multi_spring-cloud.html" title="Spring Cloud"><link rel="up" href="multi__binder_implementations.html" title="Part&nbsp;VI.&nbsp;Binder Implementations"><link rel="prev" href="multi__binder_implementations.html" title="Part&nbsp;VI.&nbsp;Binder Implementations"><link rel="next" href="multi__apache_kafka_streams_binder.html" title="40.&nbsp;Apache Kafka Streams Binder"></head><body bgcolor="white" text="black" link="#0000FF" vlink="#840084" alink="#0000FF"><div class="navheader"><table width="100%" summary="Navigation header"><tr><th colspan="3" align="center">39.&nbsp;Apache Kafka Binder</th></tr><tr><td width="20%" align="left"><a accesskey="p" href="multi__binder_implementations.html">Prev</a>&nbsp;</td><th width="60%" align="center">Part&nbsp;VI.&nbsp;Binder Implementations</th><td width="20%" align="right">&nbsp;<a accesskey="n" href="multi__apache_kafka_streams_binder.html">Next</a></td></tr></table><hr></div><div class="chapter"><div class="titlepage"><div><div><h2 class="title"><a name="_apache_kafka_binder" href="#_apache_kafka_binder"></a>39.&nbsp;Apache Kafka Binder</h2></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_usage" href="#_usage"></a>39.1&nbsp;Usage</h2></div></div></div><p>To use Apache Kafka binder, you need to add <code class="literal">spring-cloud-stream-binder-kafka</code> as a dependency to your Spring Cloud Stream application, as shown in the following example for Maven:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;dependency&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;groupId&gt;</span>org.springframework.cloud<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/groupId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;artifactId&gt;</span>spring-cloud-stream-binder-kafka<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/artifactId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/dependency&gt;</span></pre><p>Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown inn the following example for Maven:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;dependency&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;groupId&gt;</span>org.springframework.cloud<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/groupId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;artifactId&gt;</span>spring-cloud-starter-stream-kafka<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/artifactId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/dependency&gt;</span></pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_apache_kafka_binder_overview" href="#_apache_kafka_binder_overview"></a>39.2&nbsp;Apache Kafka Binder Overview</h2></div></div></div><p>The following image shows a simplified diagram of how the Apache Kafka binder operates:</p><div class="figure"><a name="d0e11855" href="#d0e11855"></a><p class="title"><b>Figure&nbsp;39.1.&nbsp;Kafka Binder</b></p><div class="figure-contents"><div class="mediaobject"><img src="images/images/kafka-binder.png" alt="kafka binder"></div></div></div><br class="figure-break"><p>The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic.
The consumer group maps directly to the same Apache Kafka concept.
Partitioning also maps directly to Apache Kafka partitions as well.</p><p>The binder currently uses the Apache Kafka <code class="literal">kafka-clients</code> 1.0.0 jar and is designed to be used with a broker of at least that version.
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the <code class="literal">autoAddPartitions</code> property.</p></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_configuration_options_2" href="#_configuration_options_2"></a>39.3&nbsp;Configuration Options</h2></div></div></div><p>This section contains the configuration options used by the Apache Kafka binder.</p><p>For common configuration options and properties pertaining to binder, see the <a class="link" href="multi__configuration_options.html#binding-properties" title="31.2&nbsp;Binding Properties">core documentation</a>.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_kafka_binder_properties" href="#_kafka_binder_properties"></a>39.3.1&nbsp;Kafka Binder Properties</h3></div></div></div><div class="variablelist"><dl class="variablelist"><dt><span class="term">spring.cloud.stream.kafka.binder.brokers</span></dt><dd><p class="simpara">A list of brokers to which the Kafka binder connects.</p><p class="simpara">Default: <code class="literal">localhost</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.defaultBrokerPort</span></dt><dd><p class="simpara"><code class="literal">brokers</code> allows hosts specified with or without port information (for example, <code class="literal">host1,host2:port2</code>).
This sets the default port when no port is configured in the broker list.</p><p class="simpara">Default: <code class="literal">9092</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.configuration</span></dt><dd><p class="simpara">Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder.
Due to the fact that these properties are used by both producers and consumers, usage should be restricted to common properties&#8201;&#8212;&#8201;for example, security settings.
Properties here supersede any properties set in boot.</p><p class="simpara">Default: Empty map.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.consumerProperties</span></dt><dd><p class="simpara">Key/Value map of arbitrary Kafka client consumer properties.
Properties here supersede any properties set in boot and in the <code class="literal">configuration</code> property above.</p><p class="simpara">Default: Empty map.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.headers</span></dt><dd><p class="simpara">The list of custom headers that are transported by the binder.
Only required when communicating with older applications (&#8656; 1.3.x) with a <code class="literal">kafka-clients</code> version &lt; 0.11.0.0. Newer versions support headers natively.</p><p class="simpara">Default: empty.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.healthTimeout</span></dt><dd><p class="simpara">The time to wait to get partition information, in seconds.
Health reports as down if this timer expires.</p><p class="simpara">Default: 10.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.requiredAcks</span></dt><dd><p class="simpara">The number of required acks on the broker.
See the Kafka documentation for the producer <code class="literal">acks</code> property.</p><p class="simpara">Default: <code class="literal">1</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.minPartitionCount</span></dt><dd><p class="simpara">Effective only if <code class="literal">autoCreateTopics</code> or <code class="literal">autoAddPartitions</code> is set.
The global minimum number of partitions that the binder configures on topics on which it produces or consumes data.
It can be superseded by the <code class="literal">partitionCount</code> setting of the producer or by the value of <code class="literal">instanceCount * concurrency</code> settings of the producer (if either is larger).</p><p class="simpara">Default: <code class="literal">1</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.producerProperties</span></dt><dd><p class="simpara">Key/Value map of arbitrary Kafka client producer properties.
Properties here supersede any properties set in boot and in the <code class="literal">configuration</code> property above.</p><p class="simpara">Default: Empty map.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.replicationFactor</span></dt><dd><p class="simpara">The replication factor of auto-created topics if <code class="literal">autoCreateTopics</code> is active.
Can be overridden on each binding.</p><p class="simpara">Default: <code class="literal">1</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.autoCreateTopics</span></dt><dd><p class="simpara">If set to <code class="literal">true</code>, the binder creates new topics automatically.
If set to <code class="literal">false</code>, the binder relies on the topics being already configured.
In the latter case, if the topics do not exist, the binder fails to start.</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>This setting is independent of the <code class="literal">auto.topic.create.enable</code> setting of the broker and does not influence it.
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.</p></td></tr></table></div><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.autoAddPartitions</span></dt><dd><p class="simpara">If set to <code class="literal">true</code>, the binder creates new partitions if required.
If set to <code class="literal">false</code>, the binder relies on the partition size of the topic being already configured.
If the partition count of the target topic is smaller than the expected value, the binder fails to start.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix</span></dt><dd><p class="simpara">Enables transactions in the binder. See <code class="literal">transaction.id</code> in the Kafka documentation and <a class="link" href="https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions" target="_top">Transactions</a> in the <code class="literal">spring-kafka</code> documentation.
When transactions are enabled, individual <code class="literal">producer</code> properties are ignored and all producers use the <code class="literal">spring.cloud.stream.kafka.binder.transaction.producer.*</code> properties.</p><p class="simpara">Default <code class="literal">null</code> (no transactions)</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.transaction.producer.*</span></dt><dd><p class="simpara">Global producer properties for producers in a transactional binder.
See <code class="literal">spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix</code> and <a class="xref" href="multi__apache_kafka_binder.html#kafka-producer-properties" title="39.3.3&nbsp;Kafka Producer Properties">Section&nbsp;39.3.3, &#8220;Kafka Producer Properties&#8221;</a> and the general producer properties supported by all binders.</p><p class="simpara">Default: See individual producer properties.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.headerMapperBeanName</span></dt><dd><p class="simpara">The bean name of a <code class="literal">KafkaHeaderMapper</code> used for mapping <code class="literal">spring-messaging</code> headers to and from Kafka headers.
Use this, for example, if you wish to customize the trusted packages in a <code class="literal">DefaultKafkaHeaderMapper</code> that uses JSON deserialization for the headers.</p><p class="simpara">Default: none.</p></dd></dl></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="kafka-consumer-properties" href="#kafka-consumer-properties"></a>39.3.2&nbsp;Kafka Consumer Properties</h3></div></div></div><p>The following properties are available for Kafka consumers only and
must be prefixed with <code class="literal">spring.cloud.stream.kafka.bindings.&lt;channelName&gt;.consumer.</code>.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">admin.configuration</span></dt><dd><p class="simpara">A <code class="literal">Map</code> of Kafka topic properties used when provisioning topics&#8201;&#8212;&#8201;for example, <code class="literal">spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0</code></p><p class="simpara">Default: none.</p></dd><dt><span class="term">admin.replicas-assignment</span></dt><dd><p class="simpara">A Map&lt;Integer, List&lt;Integer&gt;&gt; of replica assignments, with the key being the partition and the value being the assignments.
Used when provisioning new topics.
See the <code class="literal">NewTopic</code> Javadocs in the <code class="literal">kafka-clients</code> jar.</p><p class="simpara">Default: none.</p></dd><dt><span class="term">admin.replication-factor</span></dt><dd><p class="simpara">The replication factor to use when provisioning topics. Overrides the binder-wide setting.
Ignored if <code class="literal">replicas-assignments</code> is present.</p><p class="simpara">Default: none (the binder-wide default of 1 is used).</p></dd><dt><span class="term">autoRebalanceEnabled</span></dt><dd><p class="simpara">When <code class="literal">true</code>, topic partitions is automatically rebalanced between the members of a consumer group.
When <code class="literal">false</code>, each consumer is assigned a fixed set of partitions based on <code class="literal">spring.cloud.stream.instanceCount</code> and <code class="literal">spring.cloud.stream.instanceIndex</code>.
This requires both the <code class="literal">spring.cloud.stream.instanceCount</code> and <code class="literal">spring.cloud.stream.instanceIndex</code> properties to be set appropriately on each launched instance.
The value of the <code class="literal">spring.cloud.stream.instanceCount</code> property must typically be greater than 1 in this case.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">ackEachRecord</span></dt><dd><p class="simpara">When <code class="literal">autoCommitOffset</code> is <code class="literal">true</code>, this setting dictates whether to commit the offset after each record is processed.
By default, offsets are committed after all records in the batch of records returned by <code class="literal">consumer.poll()</code> have been processed.
The number of records returned by a poll can be controlled with the <code class="literal">max.poll.records</code> Kafka property, which is set through the consumer <code class="literal">configuration</code> property.
Setting this to <code class="literal">true</code> may cause a degradation in performance, but doing so reduces the likelihood of redelivered records when a failure occurs.
Also, see the binder <code class="literal">requiredAcks</code> property, which also affects the performance of committing offsets.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">autoCommitOffset</span></dt><dd><p class="simpara">Whether to autocommit offsets when a message has been processed.
If set to <code class="literal">false</code>, a header with the key <code class="literal">kafka_acknowledgment</code> of the type <code class="literal">org.springframework.kafka.support.Acknowledgment</code> header is present in the inbound message.
Applications may use this header for acknowledging messages.
See the examples section for details.
When this property is set to <code class="literal">false</code>, Kafka binder sets the ack mode to <code class="literal">org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL</code> and the application is responsible for acknowledging records.
Also see <code class="literal">ackEachRecord</code>.</p><p class="simpara">Default: <code class="literal">true</code>.</p></dd><dt><span class="term">autoCommitOnError</span></dt><dd><p class="simpara">Effective only if <code class="literal">autoCommitOffset</code> is set to <code class="literal">true</code>.
If set to <code class="literal">false</code>, it suppresses auto-commits for messages that result in errors and commits only for successful messages. It allows a stream to automatically replay from the last successfully processed message, in case of persistent failures.
If set to <code class="literal">true</code>, it always auto-commits (if auto-commit is enabled).
If not set (the default), it effectively has the same value as <code class="literal">enableDlq</code>, auto-committing erroneous messages if they are sent to a DLQ and not committing them otherwise.</p><p class="simpara">Default: not set.</p></dd><dt><span class="term">resetOffsets</span></dt><dd><p class="simpara">Whether to reset offsets on the consumer to the value provided by startOffset.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">startOffset</span></dt><dd><p class="simpara">The starting offset for new groups.
Allowed values: <code class="literal">earliest</code> and <code class="literal">latest</code>.
If the consumer group is set explicitly for the consumer 'binding' (through <code class="literal">spring.cloud.stream.bindings.&lt;channelName&gt;.group</code>), 'startOffset' is set to <code class="literal">earliest</code>. Otherwise, it is set to <code class="literal">latest</code> for the <code class="literal">anonymous</code> consumer group.
Also see <code class="literal">resetOffsets</code> (earlier in this list).</p><p class="simpara">Default: null (equivalent to <code class="literal">earliest</code>).</p></dd><dt><span class="term">enableDlq</span></dt><dd><p class="simpara">When set to true, it enables DLQ behavior for the consumer.
By default, messages that result in errors are forwarded to a topic named <code class="literal">error.&lt;destination&gt;.&lt;group&gt;</code>.
The DLQ topic name can be configurable by setting the <code class="literal">dlqName</code> property.
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
See <a class="xref" href="multi__apache_kafka_binder.html#kafka-dlq-processing" title="39.6&nbsp;Dead-Letter Topic Processing">Section&nbsp;39.6, &#8220;Dead-Letter Topic Processing&#8221;</a> processing for more information.
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: <code class="literal">x-original-topic</code>, <code class="literal">x-exception-message</code>, and <code class="literal">x-exception-stacktrace</code> as <code class="literal">byte[]</code>.
<span class="strong"><strong>Not allowed when <code class="literal">destinationIsPattern</code> is <code class="literal">true</code>.</strong></span></p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">configuration</span></dt><dd><p class="simpara">Map with a key/value pair containing generic Kafka consumer properties.</p><p class="simpara">Default: Empty map.</p></dd><dt><span class="term">dlqName</span></dt><dd><p class="simpara">The name of the DLQ topic to receive the error messages.</p><p class="simpara">Default: null (If not specified, messages that result in errors are forwarded to a topic named <code class="literal">error.&lt;destination&gt;.&lt;group&gt;</code>).</p></dd><dt><span class="term">dlqProducerProperties</span></dt><dd><p class="simpara">Using this, DLQ-specific producer properties can be set.
All the properties available through kafka producer properties can be set through this property.</p><p class="simpara">Default: Default Kafka producer properties.</p></dd><dt><span class="term">standardHeaders</span></dt><dd><p class="simpara">Indicates which standard headers are populated by the inbound channel adapter.
Allowed values: <code class="literal">none</code>, <code class="literal">id</code>, <code class="literal">timestamp</code>, or <code class="literal">both</code>.
Useful if using native deserialization and the first component to receive a message needs an <code class="literal">id</code> (such as an aggregator that is configured to use a JDBC message store).</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">converterBeanName</span></dt><dd><p class="simpara">The name of a bean that implements <code class="literal">RecordMessageConverter</code>. Used in the inbound channel adapter to replace the default <code class="literal">MessagingMessageConverter</code>.</p><p class="simpara">Default: <code class="literal">null</code></p></dd><dt><span class="term">idleEventInterval</span></dt><dd><p class="simpara">The interval, in milliseconds, between events indicating that no messages have recently been received.
Use an <code class="literal">ApplicationListener&lt;ListenerContainerIdleEvent&gt;</code> to receive these events.
See <a class="xref" href="multi__apache_kafka_binder.html#pause-resume" title="Example: Pausing and Resuming the Consumer">the section called &#8220;Example: Pausing and Resuming the Consumer&#8221;</a> for a usage example.</p><p class="simpara">Default: <code class="literal">30000</code></p></dd><dt><span class="term">destinationIsPattern</span></dt><dd><p class="simpara">When true, the destination is treated as a regular expression <code class="literal">Pattern</code> used to match topic names by the broker.
When true, topics are not provisioned, and <code class="literal">enableDlq</code> is not allowed, because the binder does not know the topic names during the provisioning phase.
Note, the time taken to detect new topics that match the pattern is controlled by the consumer property <code class="literal">metadata.max.age.ms</code>, which (at the time of writing) defaults to 300,000ms (5 minutes).
This can be configured using the <code class="literal">configuration</code> property above.</p><p class="simpara">Default: <code class="literal">false</code></p></dd></dl></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="kafka-producer-properties" href="#kafka-producer-properties"></a>39.3.3&nbsp;Kafka Producer Properties</h3></div></div></div><p>The following properties are available for Kafka producers only and
must be prefixed with <code class="literal">spring.cloud.stream.kafka.bindings.&lt;channelName&gt;.producer.</code>.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">admin.configuration</span></dt><dd><p class="simpara">A <code class="literal">Map</code> of Kafka topic properties used when provisioning new topics&#8201;&#8212;&#8201;for example, <code class="literal">spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0</code></p><p class="simpara">Default: none.</p></dd><dt><span class="term">admin.replicas-assignment</span></dt><dd><p class="simpara">A Map&lt;Integer, List&lt;Integer&gt;&gt; of replica assignments, with the key being the partition and the value being the assignments.
Used when provisioning new topics.
See <code class="literal">NewTopic</code> javadocs in the <code class="literal">kafka-clients</code> jar.</p><p class="simpara">Default: none.</p></dd><dt><span class="term">admin.replication-factor</span></dt><dd><p class="simpara">The replication factor to use when provisioning new topics. Overrides the binder-wide setting.
Ignored if <code class="literal">replicas-assignments</code> is present.</p><p class="simpara">Default: none (the binder-wide default of 1 is used).</p></dd><dt><span class="term">bufferSize</span></dt><dd><p class="simpara">Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.</p><p class="simpara">Default: <code class="literal">16384</code>.</p></dd><dt><span class="term">sync</span></dt><dd><p class="simpara">Whether the producer is synchronous.</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">batchTimeout</span></dt><dd><p class="simpara">How long the producer waits to allow more messages to accumulate in the same batch before sending the messages.
(Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.</p><p class="simpara">Default: <code class="literal">0</code>.</p></dd><dt><span class="term">messageKeyExpression</span></dt><dd><p class="simpara">A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message&#8201;&#8212;&#8201;for example, <code class="literal">headers['myKey']</code>.
The payload cannot be used because, by the time this expression is evaluated, the payload is already in the form of a <code class="literal">byte[]</code>.</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">headerPatterns</span></dt><dd><p class="simpara">A comma-delimited list of simple patterns to match Spring messaging headers to be mapped to the Kafka <code class="literal">Headers</code> in the <code class="literal">ProducerRecord</code>.
Patterns can begin or end with the wildcard character (asterisk).
Patterns can be negated by prefixing with <code class="literal">!</code>.
Matching stops after the first match (positive or negative).
For example <code class="literal">!ask,as*</code> will pass <code class="literal">ash</code> but not <code class="literal">ask</code>.
<code class="literal">id</code> and <code class="literal">timestamp</code> are never mapped.</p><p class="simpara">Default: <code class="literal">*</code> (all headers - except the <code class="literal">id</code> and <code class="literal">timestamp</code>)</p></dd><dt><span class="term">configuration</span></dt><dd><p class="simpara">Map with a key/value pair containing generic Kafka producer properties.</p><p class="simpara">Default: Empty map.</p></dd></dl></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>The Kafka binder uses the <code class="literal">partitionCount</code> setting of the producer as a hint to create a topic with the given partition count (in conjunction with the <code class="literal">minPartitionCount</code>, the maximum of the two being the value being used).
Exercise caution when configuring both <code class="literal">minPartitionCount</code> for a binder and <code class="literal">partitionCount</code> for an application, as the larger value is used.
If a topic already exists with a smaller partition count and <code class="literal">autoAddPartitions</code> is disabled (the default), the binder fails to start.
If a topic already exists with a smaller partition count and <code class="literal">autoAddPartitions</code> is enabled, new partitions are added.
If a topic already exists with a larger number of partitions than the maximum of (<code class="literal">minPartitionCount</code> or <code class="literal">partitionCount</code>), the existing partition count is used.</p></td></tr></table></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_usage_examples" href="#_usage_examples"></a>39.3.4&nbsp;Usage examples</h3></div></div></div><p>In this section, we show the use of the preceding properties for specific scenarios.</p><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_example_setting_literal_autocommitoffset_literal_to_literal_false_literal_and_relying_on_manual_acking" href="#_example_setting_literal_autocommitoffset_literal_to_literal_false_literal_and_relying_on_manual_acking"></a>Example: Setting <code class="literal">autoCommitOffset</code> to <code class="literal">false</code> and Relying on Manual Acking</h4></div></div></div><p>This example illustrates how one may manually acknowledge offsets in a consumer application.</p><p>This example requires that <code class="literal">spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset</code> be set to <code class="literal">false</code>.
Use the corresponding input channel name for your example.</p><pre class="screen">@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message&lt;?&gt; message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}</pre></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="_example_security_configuration" href="#_example_security_configuration"></a>Example: Security Configuration</h4></div></div></div><p>Apache Kafka 0.9 supports secure connections between client and brokers.
To take advantage of this feature, follow the guidelines in the <a class="link" href="http://kafka.apache.org/090/documentation.html#security_configclients" target="_top">Apache Kafka Documentation</a> as well as the Kafka 0.9 <a class="link" href="http://docs.confluent.io/2.0.0/kafka/security.html" target="_top">security guidelines from the Confluent documentation</a>.
Use the <code class="literal">spring.cloud.stream.kafka.binder.configuration</code> option to set security properties for all clients created by the binder.</p><p>For example, to set <code class="literal">security.protocol</code> to <code class="literal">SASL_SSL</code>, set the following property:</p><pre class="screen">spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL</pre><p>All the other security properties can be set in a similar manner.</p><p>When using Kerberos, follow the instructions in the <a class="link" href="http://kafka.apache.org/090/documentation.html#security_sasl_clientconfig" target="_top">reference documentation</a> for creating and referencing the JAAS configuration.</p><p>Spring Cloud Stream supports passing JAAS configuration information to the application by using a JAAS configuration file and using Spring Boot properties.</p><div class="section"><div class="titlepage"><div><div><h5 class="title"><a name="_using_jaas_configuration_files" href="#_using_jaas_configuration_files"></a>Using JAAS Configuration Files</h5></div></div></div><p>The JAAS and (optionally) krb5 file locations can be set for Spring Cloud Stream applications by using system properties.
The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using a JAAS configuration file:</p><pre class="programlisting"> java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:<span class="hl-number">9092</span> \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT</pre></div><div class="section"><div class="titlepage"><div><div><h5 class="title"><a name="_using_spring_boot_properties" href="#_using_spring_boot_properties"></a>Using Spring Boot Properties</h5></div></div></div><p>As an alternative to having a JAAS configuration file, Spring Cloud Stream provides a mechanism for setting up the JAAS configuration for Spring Cloud Stream applications by using Spring Boot properties.</p><p>The following properties can be used to configure the login context of the Kafka client:</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">spring.cloud.stream.kafka.binder.jaas.loginModule</span></dt><dd><p class="simpara">The login module name. Not necessary to be set in normal cases.</p><p class="simpara">Default: <code class="literal">com.sun.security.auth.module.Krb5LoginModule</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.jaas.controlFlag</span></dt><dd><p class="simpara">The control flag of the login module.</p><p class="simpara">Default: <code class="literal">required</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.binder.jaas.options</span></dt><dd><p class="simpara">Map with a key/value pair containing the login module options.</p><p class="simpara">Default: Empty map.</p></dd></dl></div><p>The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using Spring Boot configuration properties:</p><pre class="programlisting"> java --spring.cloud.stream.kafka.binder.brokers=secure.server:<span class="hl-number">9092</span> \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-<span class="hl-number">1</span>@EXAMPLE.COM</pre><p>The preceding example represents the equivalent of the following JAAS file:</p><pre class="screen">KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="kafka-client-1@EXAMPLE.COM";
};</pre><p>If the topics required already exist on the broker or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.</p><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>Do not mix JAAS configuration files and Spring Boot properties in the same application.
If the <code class="literal">-Djava.security.auth.login.config</code> system property is already present, Spring Cloud Stream ignores the Spring Boot properties.</p></td></tr></table></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>Be careful when using the <code class="literal">autoCreateTopics</code> and <code class="literal">autoAddPartitions</code> with Kerberos.
Usually, applications may use principals that do not have administrative rights in Kafka and Zookeeper.
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.</p></td></tr></table></div></div></div><div class="section"><div class="titlepage"><div><div><h4 class="title"><a name="pause-resume" href="#pause-resume"></a>Example: Pausing and Resuming the Consumer</h4></div></div></div><p>If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
This is facilitated by adding the <code class="literal">Consumer</code> as a parameter to your <code class="literal">@StreamListener</code>.
To resume, you need an <code class="literal">ApplicationListener</code> for <code class="literal">ListenerContainerIdleEvent</code> instances.
The frequency at which events are published is controlled by the <code class="literal">idleEventInterval</code> property.
Since the consumer is not thread-safe, you must call these methods on the calling thread.</p><p>The following simple application shows how to pause and resume:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> Application {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
SpringApplication.run(Application.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, args);
}
<em><span class="hl-annotation" style="color: gray">@StreamListener(Sink.INPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> in(String in, <em><span class="hl-annotation" style="color: gray">@Header(KafkaHeaders.CONSUMER)</span></em> Consumer&lt;?, ?&gt; consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> TopicPartition(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"myTopic"</span>, <span class="hl-number">0</span>)));
}
<em><span class="hl-annotation" style="color: gray">@Bean</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> ApplicationListener&lt;ListenerContainerIdleEvent&gt; idleListener() {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> event -&gt; {
System.out.println(event);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (event.getConsumer().paused().size() &gt; <span class="hl-number">0</span>) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}</pre></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="kafka-error-channels" href="#kafka-error-channels"></a>39.4&nbsp;Error Channels</h2></div></div></div><p>Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
See <a class="xref" href="multi__programming_model.html#spring-cloud-stream-overview-error-handling" title="29.4&nbsp;Error Handling">Section&nbsp;29.4, &#8220;Error Handling&#8221;</a> for more information.</p><p>The payload of the <code class="literal">ErrorMessage</code> for a send failure is a <code class="literal">KafkaSendFailureException</code> with properties:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem"><code class="literal">failedMessage</code>: The Spring Messaging <code class="literal">Message&lt;?&gt;</code> that failed to be sent.</li><li class="listitem"><code class="literal">record</code>: The raw <code class="literal">ProducerRecord</code> that was created from the <code class="literal">failedMessage</code></li></ul></div><p>There is no automatic handling of producer exceptions (such as sending to a <a class="link" href="multi__apache_kafka_binder.html#kafka-dlq-processing" title="39.6&nbsp;Dead-Letter Topic Processing">Dead-Letter queue</a>).
You can consume these exceptions with your own Spring Integration flow.</p></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="kafka-metrics" href="#kafka-metrics"></a>39.5&nbsp;Kafka Metrics</h2></div></div></div><p>Kafka binder module exposes the following metrics:</p><p><code class="literal">spring.cloud.stream.binder.kafka.offset</code>: This metric indicates how many messages have not been yet consumed from a given binder&#8217;s topic by a given consumer group.
The metrics provided are based on the Mircometer metrics library. The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.</p></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="kafka-dlq-processing" href="#kafka-dlq-processing"></a>39.6&nbsp;Dead-Letter Topic Processing</h2></div></div></div><p>Because you cannot anticipate how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic.
However, if the problem is a permanent issue, that could cause an infinite loop.
The sample Spring Boot application within this topic is an example of how to route those messages back to the original topic, but it moves them to a &#8220;parking lot&#8221; topic after three attempts.
The application is another spring-cloud-stream application that reads from the dead-letter topic.
It terminates when no messages are received for 5 seconds.</p><p>The examples assume the original destination is <code class="literal">so8400out</code> and the consumer group is <code class="literal">so8400</code>.</p><p>There are a couple of strategies to consider:</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">Consider running the rerouting only when the main application is not running.
Otherwise, the retries for transient errors are used up very quickly.</li><li class="listitem">Alternatively, use a two-stage approach: Use this application to route to a third topic and another to route from there back to the main topic.</li></ul></div><p>The following code listings show the sample application:</p><p><b>application.properties.&nbsp;</b>
</p><pre class="screen">spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries</pre><p>
</p><p><b>Application.&nbsp;</b>
</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(TwoOutputProcessor.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> ReRouteDlqKApplication <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">implements</span> CommandLineRunner {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String X_RETRIES_HEADER = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"x-retries"</span>;
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>, args).close();
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> AtomicInteger processed = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> AtomicInteger();
<em><span class="hl-annotation" style="color: gray">@Autowired</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> MessageChannel parkingLot;
<em><span class="hl-annotation" style="color: gray">@StreamListener(Processor.INPUT)</span></em>
<em><span class="hl-annotation" style="color: gray">@SendTo(Processor.OUTPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Message&lt;?&gt; reRoute(Message&lt;?&gt; failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retries == null) {
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"First retry for "</span> + failed);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Integer(<span class="hl-number">1</span>))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">else</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (retries.intValue() &lt; <span class="hl-number">3</span>) {
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Another retry for "</span> + failed);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Integer(retries.intValue() + <span class="hl-number">1</span>))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">else</span> {
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Retries exhausted for "</span> + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> null;
}
<em><span class="hl-annotation" style="color: gray">@Override</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> run(String... args) <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">throws</span> Exception {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">while</span> (true) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">int</span> count = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.processed.get();
Thread.sleep(<span class="hl-number">5000</span>);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">if</span> (count == <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">this</span>.processed.get()) {
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Idle, terminating"</span>);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span>;
}
}
}
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">interface</span> TwoOutputProcessor <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">extends</span> Processor {
<em><span class="hl-annotation" style="color: gray">@Output("parkingLot")</span></em>
MessageChannel parkingLot();
}
}</pre><p>
</p></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_partitioning_with_the_kafka_binder" href="#_partitioning_with_the_kafka_binder"></a>39.7&nbsp;Partitioning with the Kafka Binder</h2></div></div></div><p>Apache Kafka supports topic partitioning natively.</p><p>Sometimes it is advantageous to send data to specific partitions&#8201;&#8212;&#8201;for example, when you want to strictly order message processing (all messages for a particular customer should go to the same partition).</p><p>The following example shows how to configure the producer and consumer side:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Source.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> KafkaPartitionProducerApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> Random RANDOM = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> Random(System.currentTimeMillis());
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">private</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">final</span> String[] data = <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> String[] {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"foo1"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"bar1"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"qux1"</span>,
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"foo2"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"bar2"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"qux2"</span>,
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"foo3"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"bar3"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"qux3"</span>,
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"foo4"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"bar4"</span>, <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"qux4"</span>,
};
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> SpringApplicationBuilder(KafkaPartitionProducerApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>)
.web(false)
.run(args);
}
<em><span class="hl-annotation" style="color: gray">@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> Message&lt;?&gt; generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"Sending: "</span> + value);
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">return</span> MessageBuilder.withPayload(value)
.setHeader(<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">"partitionKey"</span>, value)
.build();
}
}</pre><p><b>application.yml.&nbsp;</b>
</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute">spring</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> cloud</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> stream</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> bindings</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> output</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> destination</span>: partitioned.topic
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> producer</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> partitioned</span>: <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">true</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> partition-key-expression</span>: headers[<span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">'partitionKey'</span><span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">]</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> partition-count</span>: <span class="hl-number">12</span></pre><p>
</p><div class="important" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Important"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Important]" src="images/important.png"></td><th align="left">Important</th></tr><tr><td align="left" valign="top"><p>The topic must be provisioned to have enough partitions to achieve the desired concurrency for all consumer groups.
The above configuration supports up to 12 consumer instances (6 if their <code class="literal">concurrency</code> is 2, 4 if their concurrency is 3, and so on).
It is generally best to &#8220;over-provision&#8221; the partitions to allow for future increases in consumers or concurrency.</p></td></tr></table></div><div class="note" style="margin-left: 0.5in; margin-right: 0.5in;"><table border="0" summary="Note"><tr><td rowspan="2" align="center" valign="top" width="25"><img alt="[Note]" src="images/note.png"></td><th align="left">Note</th></tr><tr><td align="left" valign="top"><p>The preceding configuration uses the default partitioning (<code class="literal">key.hashCode() % partitionCount</code>).
This may or may not provide a suitably balanced algorithm, depending on the key values.
You can override this default by using the <code class="literal">partitionSelectorExpression</code> or <code class="literal">partitionSelectorClass</code> properties.</p></td></tr></table></div><p>Since partitions are natively handled by Kafka, no special configuration is needed on the consumer side.
Kafka allocates partitions across the instances.</p><p>The following Spring Boot application listens to a Kafka stream and prints (to the console) the partition ID to which each message goes:</p><pre class="programlisting"><em><span class="hl-annotation" style="color: gray">@SpringBootApplication</span></em>
<em><span class="hl-annotation" style="color: gray">@EnableBinding(Sink.class)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span> KafkaPartitionConsumerApplication {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">static</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> main(String[] args) {
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">new</span> SpringApplicationBuilder(KafkaPartitionConsumerApplication.<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">class</span>)
.web(false)
.run(args);
}
<em><span class="hl-annotation" style="color: gray">@StreamListener(Sink.INPUT)</span></em>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">public</span> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">void</span> listen(<em><span class="hl-annotation" style="color: gray">@Payload</span></em> String in, <em><span class="hl-annotation" style="color: gray">@Header(KafkaHeaders.RECEIVED_PARTITION_ID)</span></em> <span xmlns:d="http://docbook.org/ns/docbook" class="hl-keyword">int</span> partition) {
System.out.println(in + <span xmlns:d="http://docbook.org/ns/docbook" class="hl-string">" received from partition "</span> + partition);
}
}</pre><p><b>application.yml.&nbsp;</b>
</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute">spring</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> cloud</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> stream</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> bindings</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> input</span>:
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> destination</span>: partitioned.topic
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-attribute"> group</span>: myGroup</pre><p>
</p><p>You can add instances as needed.
Kafka rebalances the partition allocations.
If the instance count (or <code class="literal">instance count * concurrency</code>) exceeds the number of partitions, some consumers are idle.</p></div></div><div class="navfooter"><hr><table width="100%" summary="Navigation footer"><tr><td width="40%" align="left"><a accesskey="p" href="multi__binder_implementations.html">Prev</a>&nbsp;</td><td width="20%" align="center"><a accesskey="u" href="multi__binder_implementations.html">Up</a></td><td width="40%" align="right">&nbsp;<a accesskey="n" href="multi__apache_kafka_streams_binder.html">Next</a></td></tr><tr><td width="40%" align="left" valign="top">Part&nbsp;VI.&nbsp;Binder Implementations&nbsp;</td><td width="20%" align="center"><a accesskey="h" href="multi_spring-cloud.html">Home</a></td><td width="40%" align="right" valign="top">&nbsp;40.&nbsp;Apache Kafka Streams Binder</td></tr></table></div></body></html>