Files
spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.6.RELEASE/reference/html/kafka-streams.html
2020-06-22 15:19:11 +00:00

2229 lines
110 KiB
HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<!--[if IE]><meta http-equiv="X-UA-Compatible" content="IE=edge"><![endif]-->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="generator" content="Asciidoctor 1.5.8">
<title>Kafka Streams Binder</title>
<link rel="stylesheet" href="css/spring.css">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<style>
.hidden {
display: none;
}
.switch {
border-width: 1px 1px 0 1px;
border-style: solid;
border-color: #7a2518;
display: inline-block;
}
.switch--item {
padding: 10px;
background-color: #ffffff;
color: #7a2518;
display: inline-block;
cursor: pointer;
}
.switch--item:not(:first-child) {
border-width: 0 0 0 1px;
border-style: solid;
border-color: #7a2518;
}
.switch--item.selected {
background-color: #7a2519;
color: #ffffff;
}
</style>
<script src="https://cdnjs.cloudflare.com/ajax/libs/zepto/1.2.0/zepto.min.js"></script>
<script type="text/javascript">
function addBlockSwitches() {
$('.primary').each(function() {
primary = $(this);
createSwitchItem(primary, createBlockSwitch(primary)).item.addClass("selected");
primary.children('.title').remove();
});
$('.secondary').each(function(idx, node) {
secondary = $(node);
primary = findPrimary(secondary);
switchItem = createSwitchItem(secondary, primary.children('.switch'));
switchItem.content.addClass('hidden');
findPrimary(secondary).append(switchItem.content);
secondary.remove();
});
}
function createBlockSwitch(primary) {
blockSwitch = $('<div class="switch"></div>');
primary.prepend(blockSwitch);
return blockSwitch;
}
function findPrimary(secondary) {
candidate = secondary.prev();
while (!candidate.is('.primary')) {
candidate = candidate.prev();
}
return candidate;
}
function createSwitchItem(block, blockSwitch) {
blockName = block.children('.title').text();
content = block.children('.content').first().append(block.next('.colist'));
item = $('<div class="switch--item">' + blockName + '</div>');
item.on('click', '', content, function(e) {
$(this).addClass('selected');
$(this).siblings().removeClass('selected');
e.data.siblings('.content').addClass('hidden');
e.data.removeClass('hidden');
});
blockSwitch.append(item);
return {'item': item, 'content': content};
}
$(addBlockSwitches);
</script>
</head>
<body class="book toc2 toc-left">
<div id="header">
<div id="toc" class="toc2">
<div id="toctitle">Table of Contents</div>
<ul class="sectlevel1">
<li><a href="#_kafka_streams_binder">Kafka Streams Binder</a>
<ul class="sectlevel2">
<li><a href="#_usage">Usage</a></li>
<li><a href="#_overview">Overview</a></li>
<li><a href="#_programming_model">Programming Model</a></li>
<li><a href="#_ancillaries_to_the_programming_model">Ancillaries to the programming model</a></li>
<li><a href="#_record_serialization_and_deserialization">Record serialization and deserialization</a></li>
<li><a href="#_error_handling">Error Handling</a></li>
<li><a href="#_state_store">State Store</a></li>
<li><a href="#_interactive_queries">Interactive Queries</a></li>
<li><a href="#_health_indicator">Health Indicator</a></li>
<li><a href="#_accessing_kafka_streams_metrics">Accessing Kafka Streams Metrics</a></li>
<li><a href="#_mixing_high_level_dsl_and_low_level_processor_api">Mixing high level DSL and low level Processor API</a></li>
<li><a href="#_partition_support_on_the_outbound">Partition support on the outbound</a></li>
<li><a href="#_streamsbuilderfactorybean_customizer">StreamsBuilderFactoryBean customizer</a></li>
<li><a href="#_timestamp_extractor">Timestamp extractor</a></li>
<li><a href="#_multi_binders_with_kafka_streams_based_binders_and_regular_kafka_binder">Multi binders with Kafka Streams based binders and regular Kafka Binder</a></li>
<li><a href="#_state_cleanup">State Cleanup</a></li>
<li><a href="#_kafka_streams_topology_visualization">Kafka Streams topology visualization</a></li>
<li><a href="#_configuration_options">Configuration Options</a></li>
</ul>
</li>
</ul>
</div>
</div>
<div id="content">
<div class="sect1">
<h2 id="_kafka_streams_binder"><a class="link" href="#_kafka_streams_binder">Kafka Streams Binder</a></h2>
<div class="sectionbody">
<div class="sect2">
<h3 id="_usage"><a class="link" href="#_usage">Usage</a></h3>
<div class="paragraph">
<p>For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following maven coordinates:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-stream-binder-kafka-streams&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>A quick way to bootstrap a new project for Kafka Streams binder is to use <a href="http://start.spring.io">Spring Initializr</a> and then select "Cloud Streams" and "Spring for Kafka Streams" as shown below</p>
</div>
<div class="imageblock text-center">
<div class="content">
<img src="https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-binder-kafka/1.0.x/docs/src/main/asciidoc/images/spring-initializr-kafka-streams.png" alt="spring initializr kafka streams" width="800">
</div>
</div>
</div>
<div class="sect2">
<h3 id="_overview"><a class="link" href="#_overview">Overview</a></h3>
<div class="paragraph">
<p>Spring Cloud Stream includes a binder implementation designed explicitly for <a href="https://kafka.apache.org/documentation/streams/">Apache Kafka Streams</a> binding.
With this native integration, a Spring Cloud Stream "processor" application can directly use the
<a href="https://kafka.apache.org/documentation/streams/developer-guide">Apache Kafka Streams</a> APIs in the core business logic.</p>
</div>
<div class="paragraph">
<p>Kafka Streams binder implementation builds on the foundations provided by the <a href="https://docs.spring.io/spring-kafka/reference/html/#kafka-streams">Spring for Apache Kafka</a> project.</p>
</div>
<div class="paragraph">
<p>Kafka Streams binder provides binding capabilities for the three major types in Kafka Streams - <code>KStream</code>, <code>KTable</code> and <code>GlobalKTable</code>.</p>
</div>
<div class="paragraph">
<p>Kafka Streams applications typically follow a model in which the records are read from an inbound topic, apply business logic, and then write the transformed records to an outbound topic.
Alternatively, a Processor application with no outbound destination can be defined as well.</p>
</div>
<div class="paragraph">
<p>In the following sections, we are going to look at the details of Spring Cloud Stream&#8217;s integration with Kafka Streams.</p>
</div>
</div>
<div class="sect2">
<h3 id="_programming_model"><a class="link" href="#_programming_model">Programming Model</a></h3>
<div class="paragraph">
<p>When using the programming model provided by Kafka Streams binder, both the high-level <a href="https://docs.confluent.io/current/streams/developer-guide/dsl-api.html">Streams DSL</a> and a mix of both the higher level and the lower level <a href="https://docs.confluent.io/current/streams/developer-guide/processor-api.html">Processor-API</a> can be used as options.
When mixing both higher and lower level API&#8217;s, this is usually achieved by invoking <code>transform</code> or <code>process</code> API methods on <code>KStream</code>.</p>
</div>
<div class="sect3">
<h4 id="_functional_style"><a class="link" href="#_functional_style">Functional Style</a></h4>
<div class="paragraph">
<p>Starting with Spring Cloud Stream <code>3.0.0</code>, Kafka Streams binder allows the applications to be designed and developed using the functional programming style that is available in Java 8.
This means that the applications can be concisely represented as a lambda expression of types <code>java.util.function.Function</code> or <code>java.util.function.Consumer</code>.</p>
</div>
<div class="paragraph">
<p>Let&#8217;s take a very basic example.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer&lt;KStream&lt;Object, String&gt;&gt; process() {
return input -&gt;
input.foreach((key, value) -&gt; {
System.out.println("Key: " + key + " Value: " + value);
});
}
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Albeit simple, this is a complete standalone Spring Boot application that is leveraging Kafka Streams for stream processing.
This is a consumer application with no outbound binding and only a single inbound binding.
The application consumes data and it simply logs the information from the <code>KStream</code> key and value on the standard output.
The application contains the <code>SpringBootApplication</code> annotation and a method that is marked as <code>Bean</code>.
The bean method is of type <code>java.util.function.Consumer</code> which is parameterized with <code>KStream</code>.
Then in the implementation, we are returning a Consumer object that is essentially a lambda expression.
Inside the lambda expression, the code for processing the data is provided.</p>
</div>
<div class="paragraph">
<p>In this application, there is a single input binding that is of type <code>KStream</code>.
The binder creates this binding for the application with a name <code>process-in-0</code>, i.e. the name of the function bean name followed by a dash character (<code>-</code>) and the literal <code>in</code> followed by another dash and then the ordinal position of the parameter.
You use this binding name to set other properties such as destination.
For example, <code>spring.cloud.stream.bindings.process-in-0.destination=my-topic</code>.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
If the destination property is not set on the binding, a topic is created with the same name as the binding (if there are sufficient privileges for the application) or that topic is expected to be already available.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>Once built as a uber-jar (e.g., <code>kstream-consumer-app.jar</code>), you can run the above example like the following.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic</code></pre>
</div>
</div>
<div class="paragraph">
<p>Here is another example, where it is a full processor with both input and output bindings.
This is the classic word-count example in which the application receives data from a topic, the number of occurrences for each word is then computed in a tumbling time-window.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function&lt;KStream&lt;Object, String&gt;, KStream&lt;?, WordCount&gt;&gt; process() {
return input -&gt; input
.flatMapValues(value -&gt; Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -&gt; new KeyValue&lt;&gt;(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -&gt; new KeyValue&lt;&gt;(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Here again, this is a complete Spring Boot application. The difference here from the first application is that the bean method is of type <code>java.util.function.Function</code>.
The first parameterized type for the <code>Function</code> is for the input <code>KStream</code> and the second one is for the output.
In the method body, a lambda expression is provided that is of type <code>Function</code> and as implementation, the actual business logic is given.
Similar to the previously discussed Consumer based application, the input binding here is named as <code>process-in-0</code> by default. For the output, the binding name is automatically also set to <code>process-out-0</code>.</p>
</div>
<div class="paragraph">
<p>Once built as an uber-jar (e.g., <code>wordcount-processor.jar</code>), you can run the above example like the following.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts</code></pre>
</div>
</div>
<div class="paragraph">
<p>This application will consume messages from the Kafka topic <code>words</code> and the computed results are published to an output
topic <code>counts</code>.</p>
</div>
<div class="paragraph">
<p>Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
required in the processor. Setting up Kafka Streams specific configuration required by the Kafka Streams infrastructure
is automatically handled by the framework.</p>
</div>
<div class="paragraph">
<p>The two examples we saw above have a single <code>KStream</code> input binding. In both cases, the bindings received the records from a single topic.
If you want to multiplex multiple topics into a single <code>KStream</code> binding, you can provide comma separated Kafka topics as destinations below.</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3</code></p>
</div>
<div class="paragraph">
<p>In addition, you can also provide topic patterns as destinations if you want to match topics against a regular exression.</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.bindings.process-in-0.destination=input.*</code></p>
</div>
<div class="sect4">
<h5 id="_multiple_input_bindings"><a class="link" href="#_multiple_input_bindings">Multiple Input Bindings</a></h5>
<div class="paragraph">
<p>Many non-trivial Kafka Streams applications often consume data from more than one topic through multiple bindings.
For instance, one topic is consumed as <code>Kstream</code> and another as <code>KTable</code> or <code>GlobalKTable</code>.
There are many reasons why an application might want to receive data as a table type.
Think of a use-case where the underlying topic is populated through a change data capture (CDC) mechanism from a database or perhaps the application only cares about the latest updates for downstream processing.
If the application specifies that the data needs to be bound as <code>KTable</code> or <code>GlobalKTable</code>, then Kafka Streams binder will properly bind the destination to a <code>KTable</code> or <code>GlobalKTable</code> and make them available for the application to operate upon.
We will look at a few different scenarios how multiple input bindings are handled in the Kafka Streams binder.</p>
</div>
<div class="sect5">
<h6 id="_bifunction_in_kafka_streams_binder"><a class="link" href="#_bifunction_in_kafka_streams_binder">BiFunction in Kafka Streams Binder</a></h6>
<div class="paragraph">
<p>Here is an example where we have two inputs and an output. In this case, the application can leverage on <code>java.util.function.BiFunction</code>.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public BiFunction&lt;KStream&lt;String, Long&gt;, KTable&lt;String, String&gt;, KStream&lt;String, Long&gt;&gt; process() {
return (userClicksStream, userRegionsTable) -&gt; (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -&gt; new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -&gt; new KeyValue&lt;&gt;(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Here again, the basic theme is the same as in the previous examples, but here we have two inputs.
Java&#8217;s <code>BiFunction</code> support is used to bind the inputs to the desired destinations.
The default binding names generated by the binder for the inputs are <code>process-in-0</code> and <code>process-in-1</code> respectively. The default output binding is <code>process-out-0</code>.
In this example, the first parameter of <code>BiFunction</code> is bound as a <code>KStream</code> for the first input and the second parameter is bound as a <code>KTable</code> for the second input.</p>
</div>
</div>
<div class="sect5">
<h6 id="_biconsumer_in_kafka_streams_binder"><a class="link" href="#_biconsumer_in_kafka_streams_binder">BiConsumer in Kafka Streams Binder</a></h6>
<div class="paragraph">
<p>If there are two inputs, but no outputs, in that case we can use <code>java.util.function.BiConsumer</code> as shown below.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public BiConsumer&lt;KStream&lt;String, Long&gt;, KTable&lt;String, String&gt;&gt; process() {
return (userClicksStream, userRegionsTable) -&gt; {}
}</code></pre>
</div>
</div>
</div>
<div class="sect5">
<h6 id="_beyond_two_inputs"><a class="link" href="#_beyond_two_inputs">Beyond two inputs</a></h6>
<div class="paragraph">
<p>What if you have more than two inputs?
There are situations in which you need more than two inputs. In that case, the binder allows you to chain partial functions.
In functional programming jargon, this technique is generally known as currying.
With the functional programming support added as part of Java 8, Java now enables you to write curried functions.
Spring Cloud Stream Kafka Streams binder can make use of this feature to enable multiple input bindings.</p>
</div>
<div class="paragraph">
<p>Let&#8217;s see an example.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Function&lt;KStream&lt;Long, Order&gt;,
Function&lt;GlobalKTable&lt;Long, Customer&gt;,
Function&lt;GlobalKTable&lt;Long, Product&gt;, KStream&lt;Long, EnrichedOrder&gt;&gt;&gt;&gt; enrichOrder() {
return orders -&gt; (
customers -&gt; (
products -&gt; (
orders.join(customers,
(orderId, order) -&gt; order.getCustomerId(),
(order, customer) -&gt; new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -&gt; customerOrder
.productId(),
(customerOrder, product) -&gt; {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Let&#8217;s look at the details of the binding model presented above.
In this model, we have 3 partially applied functions on the inbound. Let&#8217;s call them as <code>f(x)</code>, <code>f(y)</code> and <code>f(z)</code>.
If we expand these functions in the sense of true mathematical functions, it will look like these: <code>f(x) &#8594; (fy) &#8594; f(z) &#8594; KStream&lt;Long, EnrichedOrder&gt;</code>.
The <code>x</code> variable stands for <code>KStream&lt;Long, Order&gt;</code>, the <code>y</code> variable stands for <code>GlobalKTable&lt;Long, Customer&gt;</code> and the <code>z</code> variable stands for <code>GlobalKTable&lt;Long, Product&gt;</code>.
The first function <code>f(x)</code> has the first input binding of the application (<code>KStream&lt;Long, Order&gt;</code>) and its output is the function, f(y).
The function <code>f(y)</code> has the second input binding for the application (<code>GlobalKTable&lt;Long, Customer&gt;</code>) and its output is yet another function, <code>f(z)</code>.
The input for the function <code>f(z)</code> is the third input for the application (<code>GlobalKTable&lt;Long, Product&gt;</code>) and its output is <code>KStream&lt;Long, EnrichedOrder&gt;</code> which is the final output binding for the application.
The input from the three partial functions which are <code>KStream</code>, <code>GlobalKTable</code>, <code>GlobalKTable</code> respectively are available for you in the method body for implementing the business logic as part of the lambda expression.</p>
</div>
<div class="paragraph">
<p>Input bindings are named as <code>enrichOrder-in-0</code>, <code>enrichOrder-in-1</code> and <code>enrichOrder-in-2</code> respectively. Output binding is named as <code>enrichOrder-out-0</code>.</p>
</div>
<div class="paragraph">
<p>With curried functions, you can virtually have any number of inputs. However, keep in mind that, anything more than a smaller number of inputs and partially applied functions for them as above in Java might lead to unreadable code.
Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.</p>
</div>
</div>
</div>
<div class="sect4">
<h5 id="_multiple_output_bindings"><a class="link" href="#_multiple_output_bindings">Multiple Output Bindings</a></h5>
<div class="paragraph">
<p>Kafka Streams allows to write outbound data into multiple topics. This feature is known as branching in Kafka Streams.
When using multiple output bindings, you need to provide an array of KStream (<code>KStream[]</code>) as the outbound return type.</p>
</div>
<div class="paragraph">
<p>Here is an example:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Function&lt;KStream&lt;Object, String&gt;, KStream&lt;?, WordCount&gt;[]&gt; process() {
Predicate&lt;Object, WordCount&gt; isEnglish = (k, v) -&gt; v.word.equals("english");
Predicate&lt;Object, WordCount&gt; isFrench = (k, v) -&gt; v.word.equals("french");
Predicate&lt;Object, WordCount&gt; isSpanish = (k, v) -&gt; v.word.equals("spanish");
return input -&gt; input
.flatMapValues(value -&gt; Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -&gt; value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -&gt; new KeyValue&lt;&gt;(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>The programming model remains the same, however the outbound parameterized type is <code>KStream[]</code>.
The default output binding names are <code>process-out-0</code>, <code>process-out-1</code>, <code>process-out-2</code> respectively.
The reason why the binder generates three output bindings is because it detects the length of the returned <code>KStream</code> array.</p>
</div>
</div>
<div class="sect4">
<h5 id="_summary_of_function_based_programming_styles_for_kafka_streams"><a class="link" href="#_summary_of_function_based_programming_styles_for_kafka_streams">Summary of Function based Programming Styles for Kafka Streams</a></h5>
<div class="paragraph">
<p>In summary, the following table shows the various options that can be used in the functional paradigm.</p>
</div>
<table class="tableblock frame-all grid-all stretch">
<colgroup>
<col style="width: 33.3333%;">
<col style="width: 33.3333%;">
<col style="width: 33.3334%;">
</colgroup>
<thead>
<tr>
<th class="tableblock halign-left valign-top">Number of Inputs</th>
<th class="tableblock halign-left valign-top">Number of Outputs</th>
<th class="tableblock halign-left valign-top">Component to use</th>
</tr>
</thead>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">1</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">0</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">java.util.function.Consumer</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">2</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">0</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">java.util.function.BiConsumer</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">1</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">1..n</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">java.util.function.Function</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">2</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">1..n</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">java.util.function.BiFunction</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">&gt;= 3</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">0..n</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Use curried functions</p></td>
</tr>
</tbody>
</table>
<div class="ulist">
<ul>
<li>
<p>In the case of more than one output in this table, the type simply becomes <code>KStream[]</code>.</p>
</li>
</ul>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_imperative_programming_model"><a class="link" href="#_imperative_programming_model">Imperative programming model.</a></h4>
<div class="paragraph">
<p>Although the functional programming model outlined above is the preferred approach, you can still use the classic <code>StreamListener</code> based approach if you prefer.</p>
</div>
<div class="paragraph">
<p>Here are some examples.</p>
</div>
<div class="paragraph">
<p>Following is the equivalent of the Word count example using <code>StreamListener</code>.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream&lt;?, WordCount&gt; process(KStream&lt;?, String&gt; input) {
return input
.flatMapValues(value -&gt; Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -&gt; value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -&gt; new KeyValue&lt;&gt;(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>As you can see, this is a bit more verbose since you need to provide <code>EnableBinding</code> and the other extra annotations like <code>StreamListener</code> and <code>SendTo</code> to make it a complete application.
<code>EnableBinding</code> is where you specify your binding interface that contains your bindings.
In this case, we are using the stock <code>KafkaStreamsProcessor</code> binding interface that has the following contracts.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>public interface KafkaStreamsProcessor {
@Input("input")
KStream&lt;?, ?&gt; input();
@Output("output")
KStream&lt;?, ?&gt; output();
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Binder will create bindings for the input <code>KStream</code> and output <code>KStream</code> since you are using a binding interface that contains those declarations.</p>
</div>
<div class="paragraph">
<p>In addition to the obvious differences in the programming model offered in the functional style, one particular thing that needs to be mentioned here is that the binding names are what you specify in the binding interface.
For example, in the above application, since we are using <code>KafkaStreamsProcessor</code>, the binding names are <code>input</code> and <code>output</code>.
Binding properties need to use those names. For instance <code>spring.cloud.stream.bindings.input.destination</code>, <code>spring.cloud.stream.bindings.output.destination</code> etc.
Keep in mind that this is fundamentally different from the functional style since there the binder generates binding names for the application.
This is because the application does not provide any binding interfaces in the functional model using <code>EnableBinding</code>.</p>
</div>
<div class="paragraph">
<p>Here is another example of a sink where we have two inputs.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream&lt;String, PlayEvent&gt; playEvents,
@Input("inputTable") KTable&lt;Long, Song&gt; songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream&lt;?, ?&gt; inputStream();
@Input("inputTable")
KTable&lt;?, ?&gt; inputTable();
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Following is the <code>StreamListener</code> equivalent of the same <code>BiFunction</code> based processor that we saw above.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream&lt;String, Long&gt; process(@Input("input") KStream&lt;String, Long&gt; userClicksStream,
@Input("inputTable") KTable&lt;String, String&gt; userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable&lt;?, ?&gt; inputTable();
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Finally, here is the <code>StreamListener</code> equivalent of the application with three inputs and curried functions.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
@StreamListener
@SendTo("output")
public KStream&lt;Long, EnrichedOrder&gt; process(
@Input("input-1") KStream&lt;Long, Order&gt; ordersStream,
@Input("input-"2) GlobalKTable&lt;Long, Customer&gt; customers,
@Input("input-3") GlobalKTable&lt;Long, Product&gt; products) {
KStream&lt;Long, CustomerOrder&gt; customerOrdersStream = ordersStream.join(
customers, (orderId, order) -&gt; order.getCustomerId(),
(order, customer) -&gt; new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -&gt; customerOrder.productId(),
(customerOrder, product) -&gt; {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
interface CustomGlobalKTableProcessor {
@Input("input-1")
KStream&lt;?, ?&gt; input1();
@Input("input-2")
GlobalKTable&lt;?, ?&gt; input2();
@Input("input-3")
GlobalKTable&lt;?, ?&gt; input3();
@Output("output")
KStream&lt;?, ?&gt; output();
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>You might notice that the above two examples are even more verbose since in addition to provide <code>EnableBinding</code>, you also need to write your own custom binding interface as well.
Using the functional model, you can avoid all those ceremonial details.</p>
</div>
<div class="paragraph">
<p>Before we move on from looking at the general programming model offered by Kafka Streams binder, here is the <code>StreamListener</code> version of multiple output bindings.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream&lt;?, WordCount&gt;[] process(KStream&lt;Object, String&gt; input) {
Predicate&lt;Object, WordCount&gt; isEnglish = (k, v) -&gt; v.word.equals("english");
Predicate&lt;Object, WordCount&gt; isFrench = (k, v) -&gt; v.word.equals("french");
Predicate&lt;Object, WordCount&gt; isSpanish = (k, v) -&gt; v.word.equals("spanish");
return input
.flatMapValues(value -&gt; Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -&gt; value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -&gt; new KeyValue&lt;&gt;(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream&lt;?, ?&gt; input();
@Output("output1")
KStream&lt;?, ?&gt; output1();
@Output("output2")
KStream&lt;?, ?&gt; output2();
@Output("output3")
KStream&lt;?, ?&gt; output3();
}
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>To recap, we have reviewed the various programming model choices when using the Kafka Streams binder.</p>
</div>
<div class="paragraph">
<p>The binder provides binding capabilities for <code>KStream</code>, <code>KTable</code> and <code>GlobalKTable</code> on the input.
<code>KTable</code> and <code>GlobalKTable</code> bindings are only available on the input.
Binder supports both input and output bindings for <code>KStream</code>.</p>
</div>
<div class="paragraph">
<p>The upshot of the programming model of Kafka Streams binder is that the binder provides you the flexibility of going with a fully functional programming model or using the <code>StreamListener</code> based imperative approach.</p>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_ancillaries_to_the_programming_model"><a class="link" href="#_ancillaries_to_the_programming_model">Ancillaries to the programming model</a></h3>
<div class="sect3">
<h4 id="_multiple_kafka_streams_processors_within_a_single_application"><a class="link" href="#_multiple_kafka_streams_processors_within_a_single_application">Multiple Kafka Streams processors within a single application</a></h4>
<div class="paragraph">
<p>Binder allows to have multiple Kafka Streams processors within a single Spring Cloud Stream application.
You can have an application as below.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public java.util.function.Function&lt;KStream&lt;Object, String&gt;, KStream&lt;Object, String&gt;&gt; process() {
...
}
@Bean
public java.util.function.Consumer&lt;KStream&lt;Object, String&gt;&gt; anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction&lt;KStream&lt;Object, String&gt;, KTable&lt;Integer, String&gt;, KStream&lt;Object, String&gt;&gt; yetAnotherProcess() {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>In this case, the binder will create 3 separate Kafka Streams objects with different application ID&#8217;s (more on this below).
However, if you have more than one processor in the application, you have to tell Spring Cloud Stream, which functions need to be activated.
Here is how you activate the functions.</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess</code></p>
</div>
<div class="paragraph">
<p>If you want certain functions to be not activated right away, you can remove that from this list.</p>
</div>
<div class="paragraph">
<p>This is also true when you have a single Kafka Streams processor and other types of <code>Function</code> beans in the same application that is handled through a different binder (for e.g., a function bean that is based on the regular Kafka Message Channel binder)</p>
</div>
</div>
<div class="sect3">
<h4 id="_kafka_streams_application_id"><a class="link" href="#_kafka_streams_application_id">Kafka Streams Application ID</a></h4>
<div class="paragraph">
<p>Application id is a mandatory property that you need to provide for a Kafka Streams application.
Spring Cloud Stream Kafka Streams binder allows you to configure this application id in multiple ways.</p>
</div>
<div class="paragraph">
<p>If you only have one single processor or <code>StreamListener</code> in the application, then you can set this at the binder level using the following property:</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.kafka.streams.binder.applicationId</code>.</p>
</div>
<div class="paragraph">
<p>As a convenience, if you only have a single processor, you can also use <code>spring.application.name</code> as the property to delegate the application id.</p>
</div>
<div class="paragraph">
<p>If you have multiple Kafka Streams processors in the application, then you need to set the application id per processor.
In the case of the functional model, you can attach it to each function as a property.</p>
</div>
<div class="paragraph">
<p>For e.g. imagine that you have the following functions.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public java.util.function.Consumer&lt;KStream&lt;Object, String&gt;&gt; process() {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>and</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public java.util.function.Consumer&lt;KStream&lt;Object, String&gt;&gt; anotherProcess() {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Then you can set the application id for each, using the following binder level properties.</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.kafka.streams.binder.functions.process.applicationId</code></p>
</div>
<div class="paragraph">
<p>and</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId</code></p>
</div>
<div class="paragraph">
<p>In the case of <code>StreamListener</code>, you need to set this on the first input binding on the processor.</p>
</div>
<div class="paragraph">
<p>For e.g. imagine that you have the following two <code>StreamListener</code> based processors.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@StreamListener
@SendTo("output")
public KStream&lt;String, String&gt; process(@Input("input") &lt;KStream&lt;Object, String&gt;&gt; input) {
...
}
@StreamListener
@SendTo("anotherOutput")
public KStream&lt;String, String&gt; anotherProcess(@Input("anotherInput") &lt;KStream&lt;Object, String&gt;&gt; input) {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Then you must set the application id for this using the following binding property.</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId</code></p>
</div>
<div class="paragraph">
<p>and</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId</code></p>
</div>
<div class="paragraph">
<p>For function based model also, this approach of setting application id at the binding level will work.
However, setting per function at the binder level as we have seen above is much easier if you are using the functional model.</p>
</div>
<div class="paragraph">
<p>For production deployments, it is highly recommended to explicitly specify the application ID through configuration.
This is especially going to be very critical if you are auto scaling your application in which case you need to make sure that you are deploying each instance with the same application ID.</p>
</div>
<div class="paragraph">
<p>If the application does not provide an application ID, then in that case the binder will auto generate a static application ID for you.
This is convenient in development scenarios as it avoids the need for explicitly providing the application ID.
The generated application ID in this manner will be static over application restarts.
In the case of functional model, the generated application ID will be the function bean name followed by the literal <code>applicationID</code>, for e.g <code>process-applicationID</code> if <code>process</code> if the function bean name.
In the case of <code>StreamListener</code>, instead of using the function bean name, the generated application ID will be use the containing class name followed by the method name followed by the literal <code>applicationId</code>.</p>
</div>
<div class="sect5">
<h6 id="_summary_of_setting_application_id"><a class="link" href="#_summary_of_setting_application_id">Summary of setting Application ID</a></h6>
<div class="ulist">
<ul>
<li>
<p>By default, binder will auto generate the application ID per function or <code>StreamListener</code> methods.</p>
</li>
<li>
<p>If you have a single processor, then you can use <code>spring.kafka.streams.applicationId</code>, <code>spring.application.name</code> or <code>spring.cloud.stream.kafka.streams.binder.applicationId</code>.</p>
</li>
<li>
<p>If you have multiple processors, then application ID can be set per function using the property - <code>spring.cloud.stream.kafka.streams.binder.functions.&lt;function-name&gt;.applicationId</code>.
In the case of <code>StreamListener</code>, this can be done using <code>spring.cloud.stream.kafka.streams.bindings.input.applicationId</code>, assuming that the input binding name is <code>input</code>.</p>
</li>
</ul>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_overriding_the_default_binding_names_generated_by_the_binder_with_the_functional_style"><a class="link" href="#_overriding_the_default_binding_names_generated_by_the_binder_with_the_functional_style">Overriding the default binding names generated by the binder with the functional style</a></h4>
<div class="paragraph">
<p>By default, the binder uses the strategy discussed above to generate the binding name when using the functional style, i.e. &lt;function-bean-name&gt;-&lt;in&gt;|&lt;out&gt;-[0..n], for e.g. process-in-0, process-out-0 etc.
If you want to override those binding names, you can do that by specifying the following properties.</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.function.bindings.&lt;default binding name&gt;</code>. Default binding name is the original binding name generated by the binder.</p>
</div>
<div class="paragraph">
<p>For e.g. lets say, you have this function.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public BiFunction&lt;KStream&lt;String, Long&gt;, KTable&lt;String, String&gt;, KStream&lt;String, Long&gt;&gt; process() {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Binder will generate bindings with names, <code>process-in-0</code>, <code>process-in-1</code> and <code>process-out-0</code>.
Now, if you want to change them to something else completely, maybe more domain specific binding names, then you can do so as below.</p>
</div>
<div class="paragraph">
<p><code>springc.cloud.stream.function.bindings.process-in-0=users</code></p>
</div>
<div class="paragraph">
<p><code>springc.cloud.stream.function.bindings.process-in-0=regions</code></p>
</div>
<div class="paragraph">
<p>and</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.function.bindings.process-out-0=clicks</code></p>
</div>
<div class="paragraph">
<p>After that, you must set all the binding level properties on these new binding names.</p>
</div>
<div class="paragraph">
<p>Please keep in mind that with the functional programming model described above, adhering to the default binding names make sense in most situations.
The only reason you may still want to do this overriding is when you have larger number of configuration properties and you want to map the bindings to something more domain friendly.</p>
</div>
</div>
<div class="sect3">
<h4 id="_setting_up_bootstrap_server_configuration"><a class="link" href="#_setting_up_bootstrap_server_configuration">Setting up bootstrap server configuration</a></h4>
<div class="paragraph">
<p>When running Kafka Streams applications, you must provide the Kafka broker server information.
If you don&#8217;t provide this information, the binder expects that you are running the broker at the default <code>localhost:9092</code>.
If that is not the case, then you need to override that. There are a couple of ways to do that.</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Using the boot property - <code>spring.kafka.bootstrapServers</code></p>
</li>
<li>
<p>Binder level property - <code>spring.cloud.stream.kafka.streams.binder.brokers</code></p>
</li>
</ul>
</div>
<div class="paragraph">
<p>When it comes to the binder level property, it doesn&#8217;t matter if you use the broker property provided through the regular Kafka binder - <code>spring.cloud.stream.kafka.binder.brokers</code>.
Kafka Streams binder will first check if Kafka Streams binder specific broker property is set (<code>spring.cloud.stream.kafka.streams.binder.brokers</code>) and if not found, it looks for <code>spring.cloud.stream.kafka.binder.brokers</code>.</p>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_record_serialization_and_deserialization"><a class="link" href="#_record_serialization_and_deserialization">Record serialization and deserialization</a></h3>
<div class="paragraph">
<p>Kafka Streams binder allows you to serialize and deserialize records in two ways.
One is the native serialization and deserialization facilities provided by Kafka and the other one is the message conversion capabilities of Spring Cloud Stream framework.
Lets look at some details.</p>
</div>
<div class="sect3">
<h4 id="_inbound_deserialization"><a class="link" href="#_inbound_deserialization">Inbound deserialization</a></h4>
<div class="paragraph">
<p>Keys are always deserialized using native Serdes.</p>
</div>
<div class="paragraph">
<p>For values, by default, deserialization on the inbound is natively performed by Kafka.
Please note that this is a major change on default behavior from previous versions of Kafka Streams binder where the deserialization was done by the framework.</p>
</div>
<div class="paragraph">
<p>Kafka Streams binder will try to infer matching <code>Serde</code> types by looking at the type signature of <code>java.util.function.Function|Consumer</code> or <code>StreamListener</code>.
Here is the order that it matches the Serdes.</p>
</div>
<div class="ulist">
<ul>
<li>
<p>If the application provides a bean of type <code>Serde</code> and if the return type is parameterized with the actual type of the incoming key or value type, then it will use that <code>Serde</code> for inbound deserialization.
For e.g. if you have the following in the application, the binder detects that the incoming value type for the <code>KStream</code> matches with a type that is parameterized on a <code>Serde</code> bean.
It will use that for inbound deserialization.</p>
</li>
</ul>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Serde&lt;Foo() customSerde{
...
}
@Bean
public Function&lt;KStream&lt;String, Foo&gt;, KStream&lt;String, Foo&gt;&gt; process() {
}</code></pre>
</div>
</div>
<div class="ulist">
<ul>
<li>
<p>Next, it looks at the types and see if they are one of the types exposed by Kafka Streams. If so, use them.
Here are the Serde types that the binder will try to match from Kafka Streams.</p>
<div class="literalblock">
<div class="content">
<pre>Integer, Long, Short, Double, Float, byte[], UUID and String.</pre>
</div>
</div>
</li>
<li>
<p>If none of the Serdes provided by Kafka Streams don&#8217;t match the types, then it will use JsonSerde provided by Spring Kafka. In this case, the binder assumes that the types are JSON friendly.
This is useful if you have multiple value objects as inputs since the binder will internally infer them to correct Java types.
Before falling back to the <code>JsonSerde</code> though, the binder checks at the default <code>Serde`s set in the Kafka Streams configuration to see if it is a `Serde</code> that it can match with the incoming KStream&#8217;s types.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>If none of the above strategies worked, then the applications must provide the `Serde`s through configuration.
This can be configured in two ways - binding or default.</p>
</div>
<div class="paragraph">
<p>First the binder will look if a <code>Serde</code> is provided at the binding level.
For e.g. if you have the following processor,</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public BiFunction&lt;KStream&lt;CustomKey, AvroIn1&gt;, KTable&lt;CustomKey, AvroIn2&gt;, KStream&lt;CustomKey, AvroOutput&gt;&gt; process() {...}</code></pre>
</div>
</div>
<div class="paragraph">
<p>then, you can provide a binding level <code>Serde</code> using the following:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde</code></pre>
</div>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
If you provide <code>Serde</code> as abover per input binding, then that will takes higher precedence and the binder will stay away from any <code>Serde</code> inference.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>If you want the default key/value Serdes to be used for inbound deserialization, you can do so at the binder level.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde</code></pre>
</div>
</div>
<div class="paragraph">
<p>If you don&#8217;t want the native decoding provided by Kafka, you can rely on the message conversion features that Spring Cloud Stream provides.
Since native decoding is the default, in order to let Spring Cloud Stream deserialize the inbound value object, you need to explicitly disable native decoding.</p>
</div>
<div class="paragraph">
<p>For e.g. if you have the same BiFunction processor as above, then <code>spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false</code>
You need to disable native decoding for all the inputs individually. Otherwise, native decoding will still be applied for those you do not disable.</p>
</div>
<div class="paragraph">
<p>By default, Spring Cloud Stream will use <code>application/json</code> as the content type and use an appropriate json message converter.
You can use custom message converters by using the following property and an appropriate <code>MessageConverter</code> bean.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.bindings.process-in-0.contentType</code></pre>
</div>
</div>
</div>
<div class="sect3">
<h4 id="_outbound_serialization"><a class="link" href="#_outbound_serialization">Outbound serialization</a></h4>
<div class="paragraph">
<p>Outbound serialization pretty much follows the same rules as above for inbound deserialization.
As with the inbound deserialization, one major change from the previous versions of Spring Cloud Stream is that the serialization on the outbound is handled by Kafka natively.
Before 3.0 versions of the binder, this was done by the framework itself.</p>
</div>
<div class="paragraph">
<p>Keys on the outbound are always serialized by Kafka using a matching <code>Serde</code> that is inferred by the binder.
If it can&#8217;t infer the type of the key, then that needs to be specified using configuration.</p>
</div>
<div class="paragraph">
<p>Value serdes are inferred using the same rules used for inbound deserialization.
First it matches to see if the outbound type is from a provided bean in the application.
If not, it checks to see if it matches with a <code>Serde</code> exposed by Kafka such as - <code>Integer</code>, <code>Long</code>, <code>Short</code>, <code>Double</code>, <code>Float</code>, <code>byte[]</code>, <code>UUID</code> and <code>String</code>.
If that doesnt&#8217;t work, then it falls back to <code>JsonSerde</code> provided by the Spring Kafka project, but first look at the default <code>Serde</code> configuration to see if there is a match.
Keep in mind that all these happen transparently to the application.
If none of these work, then the user has to provide the <code>Serde</code> to use by configuration.</p>
</div>
<div class="paragraph">
<p>Lets say you are using the same <code>BiFunction</code> processor as above. Then you can configure outbound key/value Serdes as following.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde</code></pre>
</div>
</div>
<div class="paragraph">
<p>If Serde inference fails, and no binding level Serdes are provided, then the binder falls back to the <code>JsonSerde</code>, but look at the default Serdes for a match.</p>
</div>
<div class="paragraph">
<p>Default serdes are configured in the same way as above where it is described under deserialization.</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde</code>
<code>spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde</code></p>
</div>
<div class="paragraph">
<p>If your application uses the branching feature and has multiple output bindings, then these have to be configured per binding.
Once again, if the binder is capable of inferring the <code>Serde</code> types, you don&#8217;t need to do this configuration.</p>
</div>
<div class="paragraph">
<p>If you don&#8217;t want the native encoding provided by Kafka, but want to use the framework provided message conversion, then you need to explicitly disable native encoding since since native encoding is the default.
For e.g. if you have the same BiFunction processor as above, then <code>spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false</code>
You need to disable native encoding for all the output individually in the case of branching. Otherwise, native encoding will still be applied for those you don&#8217;t disable.</p>
</div>
<div class="paragraph">
<p>When conversion is done by Spring Cloud Stream, by default, it will use <code>application/json</code> as the content type and use an appropriate json message converter.
You can use custom message converters by using the following property and a corresponding <code>MessageConverter</code> bean.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.bindings.process-out-0.contentType</code></pre>
</div>
</div>
<div class="paragraph">
<p>When native encoding/decoding is disabled, binder will not do any inference as in the case of native Serdes.
Applications need to explicitly provide all the configuration options.
For that reason, it is generally advised to stay with the default options for de/serialization and stick with native de/serialization provided by Kafka Streams when you write Spring Cloud Stream Kafka Streams applications.
The one scenario in which you must use message conversion capabilities provided by the framework is when your upstream producer is using a specific serialization strategy.
In that case, you want to use a matching deserialization strategy as native mechanisms may fail.
When relying on the default <code>Serde</code> mechanism, the applications must ensure that the binder has a way forward with correctly map the inbound and outbound with a proper <code>Serde</code>, as otherwise things might fail.</p>
</div>
<div class="paragraph">
<p>It is worth to mention that the data de/serialization approaches outlined above are only applicable on the edges of your processors, i.e. - inbound and outbound.
Your business logic might still need to call Kafka Streams API&#8217;s that explicitly need <code>Serde</code> objects.
Those are still the responsibility of the application and must be handled accordingly by the developer.</p>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_error_handling"><a class="link" href="#_error_handling">Error Handling</a></h3>
<div class="paragraph">
<p>Apache Kafka Streams provides the capability for natively handling exceptions from deserialization errors.
For details on this support, please see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers">this</a>.
Out of the box, Apache Kafka Streams provides two kinds of deserialization exception handlers - <code>LogAndContinueExceptionHandler</code> and <code>LogAndFailExceptionHandler</code>.
As the name indicates, the former will log the error and continue processing the next records and the latter will log the error and fail. <code>LogAndFailExceptionHandler</code> is the default deserialization exception handler.</p>
</div>
<div class="sect3">
<h4 id="_handling_deserialization_exceptions_in_the_binder"><a class="link" href="#_handling_deserialization_exceptions_in_the_binder">Handling Deserialization Exceptions in the Binder</a></h4>
<div class="paragraph">
<p>Kafka Streams binder allows to specify the deserialization exception handlers above using the following property.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue</code></pre>
</div>
</div>
<div class="paragraph">
<p>or</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail</code></pre>
</div>
</div>
<div class="paragraph">
<p>In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous records (poison pills) to a DLQ (dead letter queue) topic.
Here is how you enable this DLQ exception handler.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq</code></pre>
</div>
</div>
<div class="paragraph">
<p>When the above property is set, all the records in deserialization error are automatically sent to the DLQ topic.</p>
</div>
<div class="paragraph">
<p>You can set the topic name where the DLQ messages are published as below.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)</code></pre>
</div>
</div>
<div class="paragraph">
<p>If this is set, then the error records are sent to the topic <code>custom-dlq</code>.
If this is not set, then it will create a DLQ topic with the name <code>error.&lt;input-topic-name&gt;.&lt;application-id&gt;</code>.
For instance, if your binding&#8217;s destination topic is <code>inputTopic</code> and the application ID is <code>process-applicationId</code>, then the default DLQ topic is <code>error.inputTopic.process-applicationId</code>.
It is always recommended to explicitly create a DLQ topic for each input binding if it is your intention to enable DLQ.</p>
</div>
</div>
<div class="sect3">
<h4 id="_dlq_per_input_consumer_binding"><a class="link" href="#_dlq_per_input_consumer_binding">DLQ per input consumer binding</a></h4>
<div class="paragraph">
<p>The property <code>spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler</code> is applicable for the entire application.
This implies that if there are multiple functions or <code>StreamListener</code> methods in the same application, this property is applied to all of them.
However, if you have multiple processors or multiple input bindings within a single processor, then you can use the finer-grained DLQ control that the binder provides per input consumer binding.</p>
</div>
<div class="paragraph">
<p>If you have the following processor,</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public BiFunction&lt;KStream&lt;String, Long&gt;, KTable&lt;String, String&gt;, KStream&lt;String, Long&gt;&gt; process() {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>and you only want to enable DLQ on the first input binding and logAndSkip on the second binding, then you can do so on the consumer as below.</p>
</div>
<div class="paragraph">
<p><code>spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq</code>
<code>spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip</code></p>
</div>
<div class="paragraph">
<p>Setting deserialization exception handlers this way has a higher precedence than setting at the binder level.</p>
</div>
</div>
<div class="sect3">
<h4 id="_dlq_partitioning"><a class="link" href="#_dlq_partitioning">DLQ partitioning</a></h4>
<div class="paragraph">
<p>By default, records are published to the Dead-Letter topic using the same partition as the original record.
This means the Dead-Letter topic must have at least as many partitions as the original record.</p>
</div>
<div class="paragraph">
<p>To change this behavior, add a <code>DlqPartitionFunction</code> implementation as a <code>@Bean</code> to the application context.
Only one such bean can be present.
The function is provided with the consumer group (which is the same as the application ID in most situations), the failed <code>ConsumerRecord</code> and the exception.
For example, if you always want to route to partition 0, you might use:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -&gt; 0;
}</code></pre>
</div>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
If you set a consumer binding&#8217;s <code>dlqPartitions</code> property to 1 (and the binder&#8217;s <code>minPartitionCount</code> is equal to <code>1</code>), there is no need to supply a <code>DlqPartitionFunction</code>; the framework will always use partition 0.
If you set a consumer binding&#8217;s <code>dlqPartitions</code> property to a value greater than <code>1</code> (or the binder&#8217;s <code>minPartitionCount</code> is greater than <code>1</code>), you <strong>must</strong> provide a <code>DlqPartitionFunction</code> bean, even if the partition count is the same as the original topic&#8217;s.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.</p>
</div>
<div class="ulist">
<ul>
<li>
<p>The property <code>spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler</code> is applicable for the entire application.
This implies that if there are multiple functions or <code>StreamListener</code> methods in the same application, this property is applied to all of them.</p>
</li>
<li>
<p>The exception handling for deserialization works consistently with native deserialization and framework provided message conversion.</p>
</li>
</ul>
</div>
</div>
<div class="sect3">
<h4 id="_handling_production_exceptions_in_the_binder"><a class="link" href="#_handling_production_exceptions_in_the_binder">Handling Production Exceptions in the Binder</a></h4>
<div class="paragraph">
<p>Unlike the support for deserialization exception handlers as described above, the binder does not provide such first class mechanisms for handling production exceptions.
However, you still can configure production exception handlers using the <code>StreamsBuilderFactoryBean</code> customizer which you can find more details about, in a subsequent section below.</p>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_state_store"><a class="link" href="#_state_store">State Store</a></h3>
<div class="paragraph">
<p>State stores are created automatically by Kafka Streams when the high level DSL is used and appropriate calls are made those trigger a state store.</p>
</div>
<div class="paragraph">
<p>If you want to materialize an incoming <code>KTable</code> binding as a named state store, then you can do so by using the following strategy.</p>
</div>
<div class="paragraph">
<p>Lets say you have the following function.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public BiFunction&lt;KStream&lt;String, Long&gt;, KTable&lt;String, String&gt;, KStream&lt;String, Long&gt;&gt; process() {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Then by setting the following property, the incoming <code>KTable</code> data will be materialized in to the named state store.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store</code></pre>
</div>
</div>
<div class="paragraph">
<p>You can define custom state stores as beans in your application and those will be detected and added to the Kafka Streams builder by the binder.
Especially when the processor API is used, you need to register a state store manually.
In order to do so, you can create the StateStore as a bean in the application.
Here are examples of defining such beans.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>These state stores can be then accessed by the applications directly.</p>
</div>
<div class="paragraph">
<p>During the bootstrap, the above beans will be processed by the binder and passed on to the Streams builder object.</p>
</div>
<div class="paragraph">
<p>Accessing the state store:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>Processor&lt;Object, Product&gt;() {
WindowStore&lt;Object, String&gt; state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>This will not work when it comes to registering global state stores.
In order to register a global state store, please see the section below on customizing <code>StreamsBuilderFactoryBean</code>.</p>
</div>
</div>
<div class="sect2">
<h3 id="_interactive_queries"><a class="link" href="#_interactive_queries">Interactive Queries</a></h3>
<div class="paragraph">
<p>Kafka Streams binder API exposes a class called <code>InteractiveQueryService</code> to interactively query the state stores.
You can access this as a Spring bean in your application. An easy way to get access to this bean from your application is to <code>autowire</code> the bean.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Autowired
private InteractiveQueryService interactiveQueryService;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>ReadOnlyKeyValueStore&lt;Object, Object&gt; keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());</code></pre>
</div>
</div>
<div class="paragraph">
<p>During the startup, the above method call to retrieve the store might fail.
For e.g it might still be in the middle of initializing the state store.
In such cases, it will be useful to retry this operation.
Kafka Streams binder provides a simple retry mechanism to accommodate this.</p>
</div>
<div class="paragraph">
<p>Following are the two properties that you can use to control this retrying.</p>
</div>
<div class="ulist">
<ul>
<li>
<p>spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - Default is <code>1</code> .</p>
</li>
<li>
<p>spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - Default is <code>1000</code> milliseconds.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>If there are multiple instances of the kafka streams application running, then before you can query them interactively, you need to identify which application instance hosts the particular key that you are querying.
<code>InteractiveQueryService</code> API provides methods for identifying the host information.</p>
</div>
<div class="paragraph">
<p>In order for this to work, you must configure the property <code>application.server</code> as below:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.binder.configuration.application.server: &lt;server&gt;:&lt;port&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Here are some code snippets:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}</code></pre>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_health_indicator"><a class="link" href="#_health_indicator">Health Indicator</a></h3>
<div class="paragraph">
<p>The health indicator requires the dependency <code>spring-boot-starter-actuator</code>. For maven use:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code class="language-xml hljs" data-lang="xml">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-starter-actuator&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
</div>
</div>
<div class="paragraph">
<p>Spring Cloud Stream Kafka Streams Binder provides a health indicator to check the state of the underlying streams threads.
Spring Cloud Stream defines a property <code>management.health.binders.enabled</code> to enable the health indicator. See the
<a href="https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_health_indicator">Spring Cloud Stream documentation</a>.</p>
</div>
<div class="paragraph">
<p>The health indicator provides the following details for each stream thread&#8217;s metadata:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Thread name</p>
</li>
<li>
<p>Thread state: <code>CREATED</code>, <code>RUNNING</code>, <code>PARTITIONS_REVOKED</code>, <code>PARTITIONS_ASSIGNED</code>, <code>PENDING_SHUTDOWN</code> or <code>DEAD</code></p>
</li>
<li>
<p>Active tasks: task ID and partitions</p>
</li>
<li>
<p>Standby tasks: task ID and partitions</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>By default, only the global status is visible (<code>UP</code> or <code>DOWN</code>). To show the details, the property <code>management.endpoint.health.show-details</code> must be set to <code>ALWAYS</code> or <code>WHEN_AUTHORIZED</code>.
For more details about the health information, see the
<a href="https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-endpoints.html#production-ready-health">Spring Boot Actuator documentation</a>.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<i class="fa icon-note" title="Note"></i>
</td>
<td class="content">
The status of the health indicator is <code>UP</code> if all the Kafka threads registered are in the <code>RUNNING</code> state.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>Since there are three individual binders in Kafka Streams binder (<code>KStream</code>, <code>KTable</code> and <code>GlobalKTable</code>), all of them will report the health status.
When enabling <code>show-details</code>, some of the information reported may be redundant.</p>
</div>
<div class="paragraph">
<p>When there are multiple Kafka Streams processors present in the same application, then the health checks will be reported for all of them and will be categorized by the application ID of Kafka Streams.</p>
</div>
</div>
<div class="sect2">
<h3 id="_accessing_kafka_streams_metrics"><a class="link" href="#_accessing_kafka_streams_metrics">Accessing Kafka Streams Metrics</a></h3>
<div class="paragraph">
<p>Spring Cloud Stream Kafka Streams binder provides a basic mechanism for accessing Kafka Streams metrics exported through a Micrometer <code>MeterRegistry</code>.
Kafka Streams metrics that are available through <code>KafkaStreams#metrics()</code> are exported to this meter registry by the binder.
The metrics exported are from the consumers, producers, admin-client and the stream itself.</p>
</div>
<div class="paragraph">
<p>The metrics exported by the binder are exported with the format of metrics group name followed by a dot and then the actual metric name.
All dashes in the original metric information is replaced with dots.</p>
</div>
<div class="paragraph">
<p>For e.g. the metric name <code>network-io-total</code> from the metric group <code>consumer-metrics</code> is available in the micrometer registry as <code>consumer.metrics.network.io.total</code>.
Similarly, the metric <code>commit-total</code> from <code>stream-metrics</code> is available as <code>stream.metrics.commit.total</code>.</p>
</div>
<div class="paragraph">
<p>If you have multiple Kafka Streams processors in the same application, then the metric name will be prepended with the corresponding application ID of the Kafka Streams.
The application ID in this case will be preserved as is, i.e. no dashes will be converted to dots etc.
For example, if the application ID of the first processor is <code>processor-1</code>, then the metric name <code>network-io-total</code> from the metric group <code>consumer-metrics</code> is available in the micrometer registry as <code>processor-1.consumer.metrics.network.io.total</code>.</p>
</div>
<div class="paragraph">
<p>You can either programmatically access the Micrometer <code>MeterRegistry</code> in the application and then iterate through the available gauges or use Spring Boot actuator to access the metrics through a REST endpoint.
When accessing through the Boot actuator endpoint, make sure to add <code>metrics</code> to the property <code>management.endpoints.web.exposure.include</code>.
Then you can access <code>/acutator/metrics</code> to get a list of all the available metrics which then can be individually accessed through the same URI (<code>/actuator/metrics/&lt;metric-name&gt;</code>).</p>
</div>
<div class="paragraph">
<p>Anything beyond the info level metrics available through <code>KafkaStreams#metrics()</code>, (for e.g. the debugging level metrics) are still only available through JMX after you set the <code>metrics.recording.level</code> to <code>DEBUG</code>.
Kafka Streams, by default, set this level to <code>INFO</code>.
<a href="https://kafka.apache.org/documentation/#kafka_streams_monitoring">Please see this section</a> from Kafka Streams documentation for more details.
In a future release, binder may support exporting these DEBUG level metrics through Micrometer.</p>
</div>
</div>
<div class="sect2">
<h3 id="_mixing_high_level_dsl_and_low_level_processor_api"><a class="link" href="#_mixing_high_level_dsl_and_low_level_processor_api">Mixing high level DSL and low level Processor API</a></h3>
<div class="paragraph">
<p>Kafka Streams provides two variants of APIs.
It has a higher level DSL like API where you can chain various operations that maybe familiar to a lot of functional programmers.
Kafka Streams also gives access to a low level Processor API.
The processor API, although very powerful and gives the ability to control things in a much lower level, is imperative in nature.
Kafka Streams binder for Spring Cloud Stream, allows you to use either the high level DSL or mixing both the DSL and the processor API.
Mixing both of these variants give you a lot of options to control various use cases in an application.
Applications can use the <code>transform</code> or <code>process</code> method API calls to get access to the processor API.</p>
</div>
<div class="paragraph">
<p>Here is a look at how one may combine both the DSL and the processor API in a Spring Cloud Stream application using the <code>process</code> API.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Consumer&lt;KStream&lt;Object, String&gt;&gt; process() {
return input -&gt;
input.process(() -&gt; new Processor&lt;Object, String&gt;() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Here is an example using the <code>transform</code> API.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Consumer&lt;KStream&lt;Object, String&gt;&gt; process() {
return (input, a) -&gt;
input.transform(() -&gt; new Transformer&lt;Object, String, KeyValue&lt;Object, String&gt;&gt;() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue&lt;Object, String&gt; transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>The <code>process</code> API method call is a terminal operation while the <code>transform</code> API is non terminal and gives you a potentially transformed <code>KStream</code> using which you can continue further processing using either the DSL or the processor API.</p>
</div>
</div>
<div class="sect2">
<h3 id="_partition_support_on_the_outbound"><a class="link" href="#_partition_support_on_the_outbound">Partition support on the outbound</a></h3>
<div class="paragraph">
<p>A Kafka Streams processor usually sends the processed output into an outbound Kafka topic.
If the outbound topic is partitioned and the processor needs to send the outgoing data into particular partitions, the applications needs to provide a bean of type <code>StreamPartitioner</code>.
See <a href="https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/processor/StreamPartitioner.html">StreamPartitioner</a> for more details.
Let&#8217;s see some examples.</p>
</div>
<div class="paragraph">
<p>This is the same processor we already saw multiple times,</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Function&lt;KStream&lt;Object, String&gt;, KStream&lt;?, WordCount&gt;&gt; process() {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Here is the output binding destination:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.bindings.process-out-0.destination: outputTopic</code></pre>
</div>
</div>
<div class="paragraph">
<p>If the topic <code>outputTopic</code> has 4 partitions, if you don&#8217;t provide a partitioning strategy, Kafka Streams will use default partitioning strategy which may not be the outcome you want depending on the particular use case.
Let&#8217;s say, you want to send any key that matches to <code>spring</code> to partition 0, <code>cloud</code> to partition 1, <code>stream</code> to partition 2, and everything else to partition 3.
This is what you need to do in the application.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public StreamPartitioner&lt;String, WordCount&gt; streamPartitioner() {
return (t, k, v, n) -&gt; {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>This is a rudimentary implementation, however, you have access to the key/value of the record, the topic name and the total number of partitions.
Therefore, you can implement complex partitioning strategies if need be.</p>
</div>
<div class="paragraph">
<p>You also need to provide this bean name along with the application configuration.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner</code></pre>
</div>
</div>
<div class="paragraph">
<p>Each output topic in the application needs to be configured separately like this.</p>
</div>
</div>
<div class="sect2">
<h3 id="_streamsbuilderfactorybean_customizer"><a class="link" href="#_streamsbuilderfactorybean_customizer">StreamsBuilderFactoryBean customizer</a></h3>
<div class="paragraph">
<p>It is often required to customize the <code>StreamsBuilderFactoryBean</code> that creates the <code>KafkaStreams</code> objects.
Based on the underlying support provided by Spring Kafka, the binder allows you to customize the <code>StreamsBuilderFactoryBean</code>.
You can use the <code>StreamsBuilderFactoryBeanCustomizer</code> to customize the <code>StreamsBuilderFactoryBean</code> itself.
Then, once you get access to the <code>StreamsBuilderFactoryBean</code> through this customizer, you can customize the corresponding <code>KafkaStreams</code> using <code>KafkaStreamsCustomzier</code>.
Both of these customizers are part of the Spring for Apache Kafka project.</p>
</div>
<div class="paragraph">
<p>Here is an example of using the <code>StreamsBuilderFactoryBeanCustomizer</code>.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -&gt; sfb.setStateListener((newState, oldState) -&gt; {
//Do some action here!
});
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>The above is shown as an illustration of the things you can do to customize the <code>StreamsBuilderFactoryBean</code>.
You can essentially call any available mutation operations from <code>StreamsBuilderFactoryBean</code> to customize it.
This customizer will be invoked by the binder right before the factory bean is started.</p>
</div>
<div class="paragraph">
<p>Once you get access to the <code>StreamsBuilderFactoryBean</code>, you can also customize the underlying <code>KafkaStreams</code> object.
Here is a blueprint for doing so.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -&gt; {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -&gt; {
});
}
});
};
}</code></pre>
</div>
</div>
<div class="paragraph">
<p><code>KafkaStreamsCustomizer</code> will be called by the <code>StreamsBuilderFactoryBeabn</code> right before the underlying <code>KafkaStreams</code> gets started.</p>
</div>
<div class="paragraph">
<p>There can only be one <code>StreamsBuilderFactoryBeanCustomizer</code> in the entire application.
Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual <code>StreamsBuilderFactoryBean</code> objects?
In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID.</p>
</div>
<div class="paragraph">
<p>For e.g,</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -&gt; {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -&gt; {
});
}
});
}
};</code></pre>
</div>
</div>
<div class="sect3">
<h4 id="_using_customizer_to_register_a_global_state_store"><a class="link" href="#_using_customizer_to_register_a_global_state_store">Using Customizer to register a global state store</a></h4>
<div class="paragraph">
<p>As mentioned above, the binder does not provide a first class way to register global state stores as a feature.
For that, you need to use the customizer.
Here is how that can be done.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -&gt; {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Again, if you have multiple processors, you want to attach the global state store to the right <code>StreamsBuilder</code> by filtering out the other <code>StreamsBuilderFactoryBean</code> objects using the application id as outlined above.</p>
</div>
</div>
<div class="sect3">
<h4 id="_using_customizer_to_register_a_production_exception_handler"><a class="link" href="#_using_customizer_to_register_a_production_exception_handler">Using customizer to register a production exception handler</a></h4>
<div class="paragraph">
<p>In the error handling section, we indicated that the binder does not provide a first class way to deal with production exceptions.
Though that is the case, you can still use the <code>StreamsBuilderFacotryBean</code> customizer to register production exception handlers. See below.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -&gt; {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Once again, if you have multiple processors, you may want to set it appropriately against the correct <code>StreamsBuilderFactoryBean</code>.
You may also add such production exception handlers using the configuration property (See below for more on that), but this is an option if you choose to go with a programmatic approach.</p>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_timestamp_extractor"><a class="link" href="#_timestamp_extractor">Timestamp extractor</a></h3>
<div class="paragraph">
<p>Kafka Streams allows you to control the processing of the consumer records based on various notions of timestamp.
By default, Kafka Streams extracts the timestamp metadata embedded in the consumer record.
You can change this default behavior by providing a different <code>TimestampExtractor</code> implementation per input binding.
Here are some details on how that can be done.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Function&lt;KStream&lt;Long, Order&gt;,
Function&lt;KTable&lt;Long, Customer&gt;,
Function&lt;GlobalKTable&lt;Long, Product&gt;, KStream&lt;Long, Order&gt;&gt;&gt;&gt; process() {
return orderStream -&gt;
customers -&gt;
products -&gt; orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>Then you set the above <code>TimestampExtractor</code> bean name per consumer binding.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"</code></pre>
</div>
</div>
<div class="paragraph">
<p>If you skip an input consumer binding for setting a custom timestamp extractor, that consumer will use the default settings.</p>
</div>
</div>
<div class="sect2">
<h3 id="_multi_binders_with_kafka_streams_based_binders_and_regular_kafka_binder"><a class="link" href="#_multi_binders_with_kafka_streams_based_binders_and_regular_kafka_binder">Multi binders with Kafka Streams based binders and regular Kafka Binder</a></h3>
<div class="paragraph">
<p>You can have an application where you have both a function/consumer/supplier that is based on the regular Kafka binder and a Kafka Streams based processor.
However, you cannot mix both of them within a single function or consumer.</p>
</div>
<div class="paragraph">
<p>Here is an example, where you have both binder based components within the same application.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Function&lt;String, String&gt; process() {
return s -&gt; s;
}
@Bean
public Function&lt;KStream&lt;Object, String&gt;, KStream&lt;?, WordCount&gt;&gt; kstreamProcess() {
return input -&gt; input;
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>This is the relevant parts from the configuration:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar</code></pre>
</div>
</div>
<div class="paragraph">
<p>Things become a bit more complex if you have the same application as above, but is dealing with two different Kafka clusters, for e.g. the regular <code>process</code> is acting upon both Kafka cluster 1 and cluster 2 (receiving data from cluster-1 and sending to cluster-2) and the Kafka Streams processor is acting upon Kafka cluster 2.
Then you have to use the <a href="https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#multiple-binders">multi binder</a> facilities provided by Spring Cloud Stream.</p>
</div>
<div class="paragraph">
<p>Here is how your configuration may change in that scenario.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code># multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3</code></pre>
</div>
</div>
<div class="paragraph">
<p>Pay attention to the above configuration.
We have two kinds of binders, but 3 binders all in all, first one is the regular Kafka binder based on cluster 1 (<code>kafka1</code>), then another Kafka binder based on cluster 2 (<code>kafka2</code>) and finally the <code>kstream</code> one (<code>kafka3</code>).
The first processor in the application receives data from <code>kafka1</code> and publishes to <code>kafka2</code> where both binders are based on regular Kafka binder but differnt clusters.
The second processor, which is a Kafka Streams processor consumes data from <code>kafka3</code> which is the same cluster as <code>kafka2</code>, but a different binder type.</p>
</div>
<div class="paragraph">
<p>Since there are three different binder types available in the Kafka Streams family of binders - <code>kstream</code>, <code>ktable</code> and <code>globalktable</code> - if your application has multiple bindings based on any of these binders, that needs to be explicitly provided as the binder type.</p>
</div>
<div class="paragraph">
<p>For e.g if you have a processor as below,</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>@Bean
public Function&lt;KStream&lt;Long, Order&gt;,
Function&lt;KTable&lt;Long, Customer&gt;,
Function&lt;GlobalKTable&lt;Long, Product&gt;, KStream&lt;Long, EnrichedOrder&gt;&gt;&gt;&gt; enrichOrder() {
...
}</code></pre>
</div>
</div>
<div class="paragraph">
<p>then, this has to be configured in a multi binder scenario as the following.
Please note that this is only needed if you have a true multi-binder scenario where there are multiple processors dealing with multiple clusters within a single application.
In that case, the binders need to be explicitly provided with the bindings to distinguish from other processor&#8217;s binder types and clusters.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.</code></pre>
</div>
</div>
</div>
<div class="sect2">
<h3 id="_state_cleanup"><a class="link" href="#_state_cleanup">State Cleanup</a></h3>
<div class="paragraph">
<p>By default, the <code>Kafkastreams.cleanup()</code> method is called when the binding is stopped.
See <a href="https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration">the Spring Kafka documentation</a>.
To modify this behavior simply add a single <code>CleanupConfig</code> <code>@Bean</code> (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.</p>
</div>
</div>
<div class="sect2">
<h3 id="_kafka_streams_topology_visualization"><a class="link" href="#_kafka_streams_topology_visualization">Kafka Streams topology visualization</a></h3>
<div class="paragraph">
<p>Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.</p>
</div>
<div class="paragraph">
<p><code>/actuator/kafkastreamstopology</code></p>
</div>
<div class="paragraph">
<p><code>/actuator/kafkastreamstopology/&lt;applicaiton-id of the processor&gt;</code></p>
</div>
<div class="paragraph">
<p>You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
Further, you also need to add <code>kafkastreamstopology</code> to <code>management.endpoints.web.exposure.include</code> property.
By default, the <code>kafkastreamstopology</code> endpoint is disabled.</p>
</div>
</div>
<div class="sect2">
<h3 id="_configuration_options"><a class="link" href="#_configuration_options">Configuration Options</a></h3>
<div class="paragraph">
<p>This section contains the configuration options used by the Kafka Streams binder.</p>
</div>
<div class="paragraph">
<p>For common configuration options and properties pertaining to binder, refer to the <a href="#binding-properties">core documentation</a>.</p>
</div>
<div class="sect3">
<h4 id="_kafka_streams_binder_properties"><a class="link" href="#_kafka_streams_binder_properties">Kafka Streams Binder Properties</a></h4>
<div class="paragraph">
<p>The following properties are available at the binder level and must be prefixed with <code>spring.cloud.stream.kafka.streams.binder.</code></p>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">configuration</dt>
<dd>
<p> Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
This property must be prefixed with <code>spring.cloud.stream.kafka.streams.binder.</code>.
Following are some examples of using this property.</p>
</dd>
</dl>
</div>
<div class="listingblock">
<div class="content">
<pre class="highlightjs highlight"><code>spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000</code></pre>
</div>
</div>
<div class="paragraph">
<p>For more information about all the properties that may go into streams configuration, see <code>StreamsConfig</code> JavaDocs in Apache Kafka Streams docs.
All configuration that you can set from <code>StreamsConfig</code> can be set through this.
When using this property, it is applicable against the entire application since this is a binder level property.
If you have more than processors in the application, all of them will acquire these properties.
In the case of properties like <code>application.id</code>, this will become problematic and therefore you have to carefully examine how the properties from <code>StreamsConfig</code> are mapped using this binder level <code>configuration</code> property.</p>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">functions.&lt;function-bean-name&gt;.applicationId</dt>
<dd>
<p>Applicable only for functional style processors.
This can be used for setting application ID per function in the application.
In the case of multiple functions, this is a handy way to set the application ID.</p>
</dd>
<dt class="hdlist1">functions.&lt;function-bean-name&gt;.configuration</dt>
<dd>
<p>Applicable only for functional style processors.
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
This is similar to the binder level <code>configuration</code> property describe above, but this level of <code>configuration</code> property is restricted only against the named function.
When you have multiple processors and you want to restrict access to the configuration based on particular functions, you might want to use this.
All <code>StreamsConfig</code> properties can be used here.</p>
</dd>
<dt class="hdlist1">brokers</dt>
<dd>
<p>Broker URL</p>
<div class="paragraph">
<p>Default: <code>localhost</code></p>
</div>
</dd>
<dt class="hdlist1">zkNodes</dt>
<dd>
<p>Zookeeper URL</p>
<div class="paragraph">
<p>Default: <code>localhost</code></p>
</div>
</dd>
<dt class="hdlist1">deserializationExceptionHandler</dt>
<dd>
<p>Deserialization error handler type.
This handler is applied at the binder level and thus applied against all input binding in the application.
There is a way to control it in a more fine-grained way at the consumer binding level.
Possible values are - <code>logAndContinue</code>, <code>logAndFail</code> or <code>sendToDlq</code></p>
<div class="paragraph">
<p>Default: <code>logAndFail</code></p>
</div>
</dd>
<dt class="hdlist1">applicationId</dt>
<dd>
<p>Convenient way to set the application.id for the Kafka Streams application globally at the binder level.
If the application contains multiple functions or <code>StreamListener</code> methods, then the application id should be set differently.
See above where setting the application id is discussed in detail.</p>
<div class="paragraph">
<p>Default: application will generate a static application ID. See the application ID section for more details.</p>
</div>
</dd>
<dt class="hdlist1">stateStoreRetry.maxAttempts</dt>
<dd>
<p>Max attempts for trying to connect to a state store.</p>
<div class="paragraph">
<p>Default: 1</p>
</div>
</dd>
<dt class="hdlist1">stateStoreRetry.backoffPeriod</dt>
<dd>
<p>Backoff period when trying to connect to a state store on a retry.</p>
<div class="paragraph">
<p>Default: 1000 ms</p>
</div>
</dd>
</dl>
</div>
</div>
<div class="sect3">
<h4 id="_kafka_streams_producer_properties"><a class="link" href="#_kafka_streams_producer_properties">Kafka Streams Producer Properties</a></h4>
<div class="paragraph">
<p>The following properties are <em>only</em> available for Kafka Streams producers and must be prefixed with <code>spring.cloud.stream.kafka.streams.bindings.&lt;binding name&gt;.producer.</code>
For convenience, if there are multiple output bindings and they all require a common value, that can be configured by using the prefix <code>spring.cloud.stream.kafka.streams.default.producer.</code>.</p>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">keySerde</dt>
<dd>
<p>key serde to use</p>
<div class="paragraph">
<p>Default: See the above discussion on message de/serialization</p>
</div>
</dd>
<dt class="hdlist1">valueSerde</dt>
<dd>
<p>value serde to use</p>
<div class="paragraph">
<p>Default: See the above discussion on message de/serialization</p>
</div>
</dd>
<dt class="hdlist1">useNativeEncoding</dt>
<dd>
<p>flag to enable/disable native encoding</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
</dl>
</div>
<div class="paragraph">
<p>streamPartitionerBeanName:
Custom outbound partitioner bean name to be used at the consumer.
Applications can provide custom <code>StreamPartitioner</code> as a Spring bean and the name of this bean can be provided to the producer to use instead of the default one.</p>
</div>
<div class="paragraph">
<p>+
Default: See the discussion above on outbound partition support.</p>
</div>
</div>
<div class="sect3">
<h4 id="_kafka_streams_consumer_properties"><a class="link" href="#_kafka_streams_consumer_properties">Kafka Streams Consumer Properties</a></h4>
<div class="paragraph">
<p>The following properties are available for Kafka Streams consumers and must be prefixed with <code>spring.cloud.stream.kafka.streams.bindings.&lt;binding-name&gt;.consumer.</code>
For convenience, if there are multiple input bindings and they all require a common value, that can be configured by using the prefix <code>spring.cloud.stream.kafka.streams.default.consumer.</code>.</p>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">applicationId</dt>
<dd>
<p>Setting application.id per input binding. This is only preferred for <code>StreamListener</code> based processors, for function based processors see other approaches outlined above.</p>
<div class="paragraph">
<p>Default: See above.</p>
</div>
</dd>
<dt class="hdlist1">keySerde</dt>
<dd>
<p>key serde to use</p>
<div class="paragraph">
<p>Default: See the above discussion on message de/serialization</p>
</div>
</dd>
<dt class="hdlist1">valueSerde</dt>
<dd>
<p>value serde to use</p>
<div class="paragraph">
<p>Default: See the above discussion on message de/serialization</p>
</div>
</dd>
<dt class="hdlist1">materializedAs</dt>
<dd>
<p>state store to materialize when using incoming KTable types</p>
<div class="paragraph">
<p>Default: <code>none</code>.</p>
</div>
</dd>
<dt class="hdlist1">useNativeDecoding</dt>
<dd>
<p>flag to enable/disable native decoding</p>
<div class="paragraph">
<p>Default: <code>true</code>.</p>
</div>
</dd>
<dt class="hdlist1">dlqName</dt>
<dd>
<p>DLQ topic name.</p>
<div class="paragraph">
<p>Default: See above on the discussion of error handling and DLQ.</p>
</div>
</dd>
<dt class="hdlist1">startOffset</dt>
<dd>
<p>Offset to start from if there is no committed offset to consume from.
This is mostly used when the consumer is consuming from a topic for the first time.
Kafka Streams uses <code>earliest</code> as the default strategy and the binder uses the same default.
This can be overridden to <code>latest</code> using this property.</p>
<div class="paragraph">
<p>Default: <code>earliest</code>.</p>
</div>
</dd>
</dl>
</div>
<div class="paragraph">
<p>Note: Using <code>resetOffsets</code> on the consumer does not have any effect on Kafka Streams binder.
Unlike the message channel based binder, Kafka Streams binder does not seek to beginning or end on demand.</p>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">deserializationExceptionHandler</dt>
<dd>
<p>Deserialization error handler type.
This handler is applied per consumer binding as opposed to the binder level property described before.
Possible values are - <code>logAndContinue</code>, <code>logAndFail</code> or <code>sendToDlq</code></p>
<div class="paragraph">
<p>Default: <code>logAndFail</code></p>
</div>
</dd>
<dt class="hdlist1">timestampExtractorBeanName</dt>
<dd>
<p>Specific time stamp extractor bean name to be used at the consumer.
Applications can provide <code>TimestampExtractor</code> as a Spring bean and the name of this bean can be provided to the consumer to use instead of the default one.</p>
<div class="paragraph">
<p>Default: See the discussion above on timestamp extractors.</p>
</div>
</dd>
</dl>
</div>
</div>
<div class="sect3">
<h4 id="_special_note_on_concurrency"><a class="link" href="#_special_note_on_concurrency">Special note on concurrency</a></h4>
<div class="paragraph">
<p>In Kafka Streams, you can control of the number of threads a processor can create using the <code>num.stream.threads</code> property.
This, you can do using the various <code>configuration</code> options described above under binder, functions, producer or consumer level.
You can also use the <code>concurrency</code> property that core Spring Cloud Stream provides for this purpose.
When using this, you need to use it on the consumer.
When you have more than one input bindings either in a function or <code>StreamListener</code>, set this on the first input binding.
For e.g. when setting <code>spring.cloud.stream.bindings.process-in-0.consumer.concurrency</code>, it will be translated as <code>num.stream.threads</code> by the binder.
If you have multiple processors and one processor defines binding level concurrency, but not the others, those ones with no binding level concurrency will default back to the binder wide property specified through
<code>spring.cloud.stream.binder.configuration.num.stream.threads</code>.
If this binder configuration is not available, then the application will use the default set by Kafka Streams.</p>
</div>
</div>
</div>
</div>
</div>
</div>
<script type="text/javascript" src="js/tocbot/tocbot.min.js"></script>
<script type="text/javascript" src="js/toc.js"></script>
<link rel="stylesheet" href="js/highlight/styles/atom-one-dark-reasonable.min.css">
<script src="js/highlight/highlight.min.js"></script>
<script>hljs.initHighlighting()</script>
</body>
</html>