Files
spring-cloud-static/Greenwich.M3/multi/multi__apache_kafka_streams_binder.html
2018-11-27 10:29:13 -05:00

276 lines
36 KiB
HTML

<html><head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>39.&nbsp;Apache Kafka Streams Binder</title><link rel="stylesheet" type="text/css" href="css/manual-multipage.css"><meta name="generator" content="DocBook XSL Stylesheets V1.78.1"><link rel="home" href="multi_spring-cloud.html" title="Spring Cloud"><link rel="up" href="multi__binder_implementations.html" title="Part&nbsp;VI.&nbsp;Binder Implementations"><link rel="prev" href="multi__apache_kafka_binder.html" title="38.&nbsp;Apache Kafka Binder"><link rel="next" href="multi__rabbitmq_binder.html" title="40.&nbsp;RabbitMQ Binder"></head><body bgcolor="white" text="black" link="#0000FF" vlink="#840084" alink="#0000FF"><div class="navheader"><table width="100%" summary="Navigation header"><tr><th colspan="3" align="center">39.&nbsp;Apache Kafka Streams Binder</th></tr><tr><td width="20%" align="left"><a accesskey="p" href="multi__apache_kafka_binder.html">Prev</a>&nbsp;</td><th width="60%" align="center">Part&nbsp;VI.&nbsp;Binder Implementations</th><td width="20%" align="right">&nbsp;<a accesskey="n" href="multi__rabbitmq_binder.html">Next</a></td></tr></table><hr></div><div class="chapter"><div class="titlepage"><div><div><h2 class="title"><a name="_apache_kafka_streams_binder" href="#_apache_kafka_streams_binder"></a>39.&nbsp;Apache Kafka Streams Binder</h2></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_usage_2" href="#_usage_2"></a>39.1&nbsp;Usage</h2></div></div></div><p>For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following
Maven coordinates:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;dependency&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;groupId&gt;</span>org.springframework.cloud<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/groupId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;artifactId&gt;</span>spring-cloud-stream-binder-kafka-streams<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/artifactId&gt;</span>
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag">&lt;/dependency&gt;</span></pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_kafka_streams_binder_overview" href="#_kafka_streams_binder_overview"></a>39.2&nbsp;Kafka Streams Binder Overview</h2></div></div></div><p>Spring Cloud Stream&#8217;s Apache Kafka support also includes a binder implementation designed explicitly for Apache Kafka
Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the
<a class="link" href="https://kafka.apache.org/documentation/streams/developer-guide" target="_top">Apache Kafka Streams</a> APIs in the core business logic.</p><p>Kafka Streams binder implementation builds on the foundation provided by the <a class="link" href="http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams" target="_top">Kafka Streams in Spring Kafka</a>
project.</p><p>Kafka Streams binder provides binding capabilities for the three major types in Kafka Streams - KStream, KTable and GlobalKTable.</p><p>As part of this native integration, the high-level <a class="link" href="https://docs.confluent.io/current/streams/developer-guide/dsl-api.html" target="_top">Streams DSL</a>
provided by the Kafka Streams API is available for use in the business logic.</p><p>An early version of the <a class="link" href="https://docs.confluent.io/current/streams/developer-guide/processor-api.html" target="_top">Processor API</a>
support is available as well.</p><p>As noted early-on, Kafka Streams support in Spring Cloud Stream is strictly only available for use in the Processor model.
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
can be written to an outbound topic. It can also be used in Processor applications with a no-outbound destination.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_streams_dsl" href="#_streams_dsl"></a>39.2.1&nbsp;Streams DSL</h3></div></div></div><p>This application consumes data from a Kafka topic (e.g., <code class="literal">words</code>), computes word count for each unique word in a 5 seconds
time window, and the computed results are sent to a downstream topic (e.g., <code class="literal">counts</code>) for further processing.</p><pre class="screen">@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream&lt;?, WordCount&gt; process(KStream&lt;?, String&gt; input) {
return input
.flatMapValues(value -&gt; Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -&gt; value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -&gt; new KeyValue&lt;&gt;(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}</pre><p>Once built as a uber-jar (e.g., <code class="literal">wordcount-processor.jar</code>), you can run the above example like the following.</p><pre class="screen">java -jar wordcount-processor.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts</pre><p>This application will consume messages from the Kafka topic <code class="literal">words</code> and the computed results are published to an output
topic <code class="literal">counts</code>.</p><p>Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
is automatically handled by the framework.</p></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_configuration_options_3" href="#_configuration_options_3"></a>39.3&nbsp;Configuration Options</h2></div></div></div><p>This section contains the configuration options used by the Kafka Streams binder.</p><p>For common configuration options and properties pertaining to binder, refer to the <a class="link" href="multi__configuration_options.html#binding-properties" title="30.2&nbsp;Binding Properties">core documentation</a>.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_kafka_streams_properties" href="#_kafka_streams_properties"></a>39.3.1&nbsp;Kafka Streams Properties</h3></div></div></div><p>The following properties are available at the binder level and must be prefixed with <code class="literal">spring.cloud.stream.kafka.streams.binder.</code>
literal.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">configuration</span></dt><dd> Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
This property must be prefixed with <code class="literal">spring.cloud.stream.kafka.streams.binder.</code>.
Following are some examples of using this property.</dd></dl></div><pre class="screen">spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000</pre><p>For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in
Apache Kafka Streams docs.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">brokers</span></dt><dd><p class="simpara">Broker URL</p><p class="simpara">Default: <code class="literal">localhost</code></p></dd><dt><span class="term">zkNodes</span></dt><dd><p class="simpara">Zookeeper URL</p><p class="simpara">Default: <code class="literal">localhost</code></p></dd><dt><span class="term">serdeError</span></dt><dd><p class="simpara">Deserialization error handler type.
Possible values are - <code class="literal">logAndContinue</code>, <code class="literal">logAndFail</code> or <code class="literal">sendToDlq</code></p><p class="simpara">Default: <code class="literal">logAndFail</code></p></dd><dt><span class="term">applicationId</span></dt><dd><p class="simpara">Convenient way to set the application.id for the Kafka Streams application globally at the binder level.
If the application contains multiple <code class="literal">StreamListener</code> methods, then application.id should be set at the binding level per input binding.</p><p class="simpara">Default: <code class="literal">none</code></p></dd></dl></div><p>The following properties are <span class="emphasis"><em>only</em></span> available for Kafka Streams producers and must be prefixed with <code class="literal">spring.cloud.stream.kafka.streams.bindings.&lt;binding name&gt;.producer.</code> literal.
For convenience, if there multiple output bindings and they all require a common value, that can be configured by using the prefix <code class="literal">spring.cloud.stream.kafka.streams.default.producer.</code>.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">keySerde</span></dt><dd><p class="simpara">key serde to use</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">valueSerde</span></dt><dd><p class="simpara">value serde to use</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">useNativeEncoding</span></dt><dd><p class="simpara">flag to enable native encoding</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd></dl></div><p>The following properties are <span class="emphasis"><em>only</em></span> available for Kafka Streams consumers and must be prefixed with <code class="literal">spring.cloud.stream.kafka.streams.bindings.&lt;binding name&gt;.consumer.`literal.
For convenience, if there multiple input bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.consumer.</code>.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">applicationId</span></dt><dd><p class="simpara">Setting application.id per input binding.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">keySerde</span></dt><dd><p class="simpara">key serde to use</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">valueSerde</span></dt><dd><p class="simpara">value serde to use</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">materializedAs</span></dt><dd><p class="simpara">state store to materialize when using incoming KTable types</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">useNativeDecoding</span></dt><dd><p class="simpara">flag to enable native decoding</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">dlqName</span></dt><dd><p class="simpara">DLQ topic name.</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd></dl></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_timewindow_properties" href="#_timewindow_properties"></a>39.3.2&nbsp;TimeWindow properties:</h3></div></div></div><p>Windowing is an important concept in stream processing applications. Following properties are available to configure
time-window computations.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">spring.cloud.stream.kafka.streams.timeWindow.length</span></dt><dd><p class="simpara">When this property is given, you can autowire a <code class="literal">TimeWindows</code> bean into the application.
The value is expressed in milliseconds.</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.streams.timeWindow.advanceBy</span></dt><dd><p class="simpara">Value is given in milliseconds.</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd></dl></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_multiple_input_bindings" href="#_multiple_input_bindings"></a>39.4&nbsp;Multiple Input Bindings</h2></div></div></div><p>For use cases that requires multiple incoming KStream objects or a combination of KStream and KTable objects, the Kafka
Streams binder provides multiple bindings support.</p><p>Let&#8217;s see it in action.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_multiple_input_bindings_as_a_sink" href="#_multiple_input_bindings_as_a_sink"></a>39.4.1&nbsp;Multiple Input Bindings as a Sink</h3></div></div></div><pre class="screen">@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream&lt;String, PlayEvent&gt; playEvents,
@Input("inputTable") KTable&lt;Long, Song&gt; songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream&lt;?, ?&gt; inputStream();
@Input("inputTable")
KTable&lt;?, ?&gt; inputTable();
}</pre><p>In the above example, the application is written as a sink, i.e. there are no output bindings and the application has to
decide concerning downstream processing. When you write applications in this style, you might want to send the information
downstream or store them in a state store (See below for Queryable State Stores).</p><p>In the case of incoming KTable, if you want to materialize the computations to a state store, you have to express it
through the following property.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.inputTable.consumer.materializedAs: all-songs</pre><p>The above example shows the use of KTable as an input binding.
The binder also supports input bindings for GlobalKTable.
GlobalKTable binding is useful when you have to ensure that all instances of your application has access to the data updates from the topic.
KTable and GlobalKTable bindings are only available on the input.
Binder supports both input and output bindings for KStream.</p></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_multiple_input_bindings_as_a_processor" href="#_multiple_input_bindings_as_a_processor"></a>39.4.2&nbsp;Multiple Input Bindings as a Processor</h3></div></div></div><pre class="screen">@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream&lt;String, Long&gt; process(@Input("input") KStream&lt;String, Long&gt; userClicksStream,
@Input("inputTable") KTable&lt;String, String&gt; userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable&lt;?, ?&gt; inputTable();
}</pre></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_multiple_output_bindings_aka_branching" href="#_multiple_output_bindings_aka_branching"></a>39.5&nbsp;Multiple Output Bindings (aka Branching)</h2></div></div></div><p>Kafka Streams allow outbound data to be split into multiple topics based on some predicates. The Kafka Streams binder provides
support for this feature without compromising the programming model exposed through <code class="literal">StreamListener</code> in the end user application.</p><p>You can write the application in the usual way as demonstrated above in the word count example. However, when using the
branching feature, you are required to do a few things. First, you need to make sure that your return type is <code class="literal">KStream[]</code>
instead of a regular <code class="literal">KStream</code>. Second, you need to use the <code class="literal">SendTo</code> annotation containing the output bindings in the order
(see example below). For each of these output bindings, you need to configure destination, content-type etc., complying with
the standard Spring Cloud Stream expectations.</p><p>Here is an example:</p><pre class="screen">@EnableBinding(KStreamProcessorWithBranches.class)
@EnableAutoConfiguration
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3})
public KStream&lt;?, WordCount&gt;[] process(KStream&lt;Object, String&gt; input) {
Predicate&lt;Object, WordCount&gt; isEnglish = (k, v) -&gt; v.word.equals("english");
Predicate&lt;Object, WordCount&gt; isFrench = (k, v) -&gt; v.word.equals("french");
Predicate&lt;Object, WordCount&gt; isSpanish = (k, v) -&gt; v.word.equals("spanish");
return input
.flatMapValues(value -&gt; Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -&gt; value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -&gt; new KeyValue&lt;&gt;(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream&lt;?, ?&gt; input();
@Output("output1")
KStream&lt;?, ?&gt; output1();
@Output("output2")
KStream&lt;?, ?&gt; output2();
@Output("output3")
KStream&lt;?, ?&gt; output3();
}
}</pre><p>Properties:</p><pre class="screen">spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/json
spring.cloud.stream.bindings.output3.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output1:
destination: foo
producer:
headerMode: raw
spring.cloud.stream.bindings.output2:
destination: bar
producer:
headerMode: raw
spring.cloud.stream.bindings.output3:
destination: fox
producer:
headerMode: raw
spring.cloud.stream.bindings.input:
destination: words
consumer:
headerMode: raw</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_message_conversion" href="#_message_conversion"></a>39.6&nbsp;Message Conversion</h2></div></div></div><p>Similar to message-channel based binder applications, the Kafka Streams binder adapts to the out-of-the-box content-type
conversions without any compromise.</p><p>It is typical for Kafka Streams operations to know the type of SerDe&#8217;s used to transform the key and value correctly.
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself at
the inbound and outbound conversions rather than using the content-type conversions offered by the framework.
On the other hand, you might be already familiar with the content-type conversion patterns provided by the framework, and
that, you&#8217;d like to continue using for inbound and outbound conversions.</p><p>Both the options are supported in the Kafka Streams binder implementation.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_outbound_serialization" href="#_outbound_serialization"></a>39.6.1&nbsp;Outbound serialization</h3></div></div></div><p>If native encoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default <code class="literal">application/json</code> will be applied). It will ignore any SerDe set on the outbound
in this case for outbound serialization.</p><p>Here is the property to set the contentType on the outbound.</p><pre class="screen">spring.cloud.stream.bindings.output.contentType: application/json</pre><p>Here is the property to enable native encoding.</p><pre class="screen">spring.cloud.stream.bindings.output.nativeEncoding: true</pre><p>If native encoding is enabled on the output binding (user has to enable it as above explicitly), then the framework will
skip any form of automatic message conversion on the outbound. In that case, it will switch to the Serde set by the user.
The <code class="literal">valueSerde</code> property set on the actual output binding will be used. Here is an example.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde</pre><p>If this property is not set, then it will use the "default" SerDe: <code class="literal">spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde</code>.</p><p>It is worth to mention that Kafka Streams binder does not serialize the keys on outbound - it simply relies on Kafka itself.
Therefore, you either have to specify the <code class="literal">keySerde</code> property on the binding or it will default to the application-wide common
<code class="literal">keySerde</code>.</p><p>Binding level key serde:</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde</pre><p>Common Key serde:</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde</pre><p>If branching is used, then you need to use multiple output bindings. For example,</p><pre class="screen">interface KStreamProcessorWithBranches {
@Input("input")
KStream&lt;?, ?&gt; input();
@Output("output1")
KStream&lt;?, ?&gt; output1();
@Output("output2")
KStream&lt;?, ?&gt; output2();
@Output("output3")
KStream&lt;?, ?&gt; output3();
}</pre><p>If <code class="literal">nativeEncoding</code> is set, then you can set different SerDe&#8217;s on individual output bindings as below.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde</pre><p>Then if you have <code class="literal">SendTo</code> like this, @SendTo({"output1", "output2", "output3"}), the <code class="literal">KStream[]</code> from the branches are
applied with proper SerDe objects as defined above. If you are not enabling <code class="literal">nativeEncoding</code>, you can then set different
contentType values on the output bindings as below. In that case, the framework will use the appropriate message converter
to convert the messages before sending to Kafka.</p><pre class="screen">spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/java-serialzied-object
spring.cloud.stream.bindings.output3.contentType: application/octet-stream</pre></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_inbound_deserialization" href="#_inbound_deserialization"></a>39.6.2&nbsp;Inbound Deserialization</h3></div></div></div><p>Similar rules apply to data deserialization on the inbound.</p><p>If native decoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default <code class="literal">application/json</code> will be applied). It will ignore any SerDe set on the inbound
in this case for inbound deserialization.</p><p>Here is the property to set the contentType on the inbound.</p><pre class="screen">spring.cloud.stream.bindings.input.contentType: application/json</pre><p>Here is the property to enable native decoding.</p><pre class="screen">spring.cloud.stream.bindings.input.nativeDecoding: true</pre><p>If native decoding is enabled on the input binding (user has to enable it as above explicitly), then the framework will
skip doing any message conversion on the inbound. In that case, it will switch to the SerDe set by the user. The <code class="literal">valueSerde</code>
property set on the actual output binding will be used. Here is an example.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde</pre><p>If this property is not set, it will use the default SerDe: <code class="literal">spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde</code>.</p><p>It is worth to mention that Kafka Streams binder does not deserialize the keys on inbound - it simply relies on Kafka itself.
Therefore, you either have to specify the <code class="literal">keySerde</code> property on the binding or it will default to the application-wide common
<code class="literal">keySerde</code>.</p><p>Binding level key serde:</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde</pre><p>Common Key serde:</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde</pre><p>As in the case of KStream branching on the outbound, the benefit of setting value SerDe per binding is that if you have
multiple input bindings (multiple KStreams object) and they all require separate value SerDe&#8217;s, then you can configure
them individually. If you use the common configuration approach, then this feature won&#8217;t be applicable.</p></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_error_handling" href="#_error_handling"></a>39.7&nbsp;Error Handling</h2></div></div></div><p>Apache Kafka Streams provide the capability for natively handling exceptions from deserialization errors.
For details on this support, please see <a class="link" href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers" target="_top">this</a>
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - <code class="literal">logAndContinue</code> and <code class="literal">logAndFail</code>.
As the name indicates, the former will log the error and continue processing the next records and the latter will log the
error and fail. <code class="literal">LogAndFail</code> is the default deserialization exception handler.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_handling_deserialization_exceptions" href="#_handling_deserialization_exceptions"></a>39.7.1&nbsp;Handling Deserialization Exceptions</h3></div></div></div><p>Kafka Streams binder supports a selection of exception handlers through the following properties.</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue</pre><p>In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous
records (poison pills) to a DLQ topic. Here is how you enable this DLQ exception handler.</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq</pre><p>When the above property is set, all the deserialization error records are automatically sent to the DLQ topic.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: foo-dlq</pre><p>If this is set, then the error records are sent to the topic <code class="literal">foo-dlq</code>. If this is not set, then it will create a DLQ
topic with the name <code class="literal">error.&lt;input-topic-name&gt;.&lt;group-name&gt;</code>.</p><p>A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">The property <code class="literal">spring.cloud.stream.kafka.streams.binder.serdeError</code> is applicable for the entire application. This implies
that if there are multiple <code class="literal">StreamListener</code> methods in the same application, this property is applied to all of them.</li><li class="listitem">The exception handling for deserialization works consistently with native deserialization and framework provided message
conversion.</li></ul></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_handling_non_deserialization_exceptions" href="#_handling_non_deserialization_exceptions"></a>39.7.2&nbsp;Handling Non-Deserialization Exceptions</h3></div></div></div><p>For general error handling in Kafka Streams binder, it is up to the end user applications to handle application level errors.
As a side effect of providing a DLQ for deserialization exception handlers, Kafka Streams binder provides a way to get
access to the DLQ sending bean directly from your application.
Once you get access to that bean, you can programmatically send any exception records from your application to the DLQ.</p><p>It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn&#8217;t natively support error
handling yet.</p><p>However, when you use the low-level Processor API in your application, there are options to control this behavior. See
below.</p><pre class="screen">@Autowired
private SendToDlqAndContinue dlqHandler;
@StreamListener("input")
@SendTo("output")
public KStream&lt;?, WordCount&gt; process(KStream&lt;Object, String&gt; input) {
input.process(() -&gt; new Processor() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object o, Object o2) {
try {
.....
.....
}
catch(Exception e) {
//explicitly provide the kafka topic corresponding to the input binding as the first argument.
//DLQ handler will correctly map to the dlq topic from the actual incoming destination.
dlqHandler.sendToDlq("topic-name", (byte[]) o1, (byte[]) o2, context.partition());
}
}
.....
.....
});
}</pre></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_state_store" href="#_state_store"></a>39.8&nbsp;State Store</h2></div></div></div><p>State store is created automatically by Kafka Streams when the DSL is used.
When processor API is used, you need to register a state store manually. In order to do so, you can use <code class="literal">KafkaStreamsStateStore</code> annotation.
You can specify the name and type of the store, flags to control log and disabling cache, etc.
Once the store is created by the binder during the bootstrapping phase, you can access this state store through the processor API.
Below are some primitives for doing this.</p><p>Creating a state store:</p><pre class="screen">@KafkaStreamsStateStore(name="mystate", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs=300000)
public void process(KStream&lt;Object, Product&gt; input) {
...
}</pre><p>Accessing the state store:</p><pre class="screen">Processor&lt;Object, Product&gt;() {
WindowStore&lt;Object, String&gt; state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_interactive_queries" href="#_interactive_queries"></a>39.9&nbsp;Interactive Queries</h2></div></div></div><p>As part of the public Kafka Streams binder API, we expose a class called <code class="literal">InteractiveQueryService</code>.
You can access this as a Spring bean in your application. An easy way to get access to this bean from your application is to "autowire" the bean.</p><pre class="screen">@Autowired
private InteractiveQueryService interactiveQueryService;</pre><p>Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.</p><pre class="screen">ReadOnlyKeyValueStore&lt;Object, Object&gt; keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());</pre><p>If there are multiple instances of the kafka streams application running, then before you can query them interactively, you need to identify which application instance hosts the key.
<code class="literal">InteractiveQueryService</code> API provides methods for identifying the host information.</p><p>In order for this to work, you must configure the property <code class="literal">application.server</code> as below:</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.configuration.application.server: &lt;server&gt;:&lt;port&gt;</pre><p>Here are some code snippets:</p><pre class="screen">org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_accessing_the_underlying_kafkastreams_object" href="#_accessing_the_underlying_kafkastreams_object"></a>39.10&nbsp;Accessing the underlying KafkaStreams object</h2></div></div></div><p><code class="literal">StreamBuilderFactoryBean</code> from spring-kafka that is responsible for constructing the <code class="literal">KafkaStreams</code> object can be accessed programmatically.
Each <code class="literal">StreamBuilderFactoryBean</code> is registered as <code class="literal">stream-builder</code> and appended with the <code class="literal">StreamListener</code> method name.
If your <code class="literal">StreamListener</code> method is named as <code class="literal">process</code> for example, the stream builder bean is named as <code class="literal">stream-builder-process</code>.
Since this is a factory bean, it should be accessed by prepending an ampersand (<code class="literal">&amp;</code>) when accessing it programmatically.
Following is an example and it assumes the <code class="literal">StreamListener</code> method is named as <code class="literal">process</code></p><pre class="screen">StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&amp;stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_state_cleanup" href="#_state_cleanup"></a>39.11&nbsp;State Cleanup</h2></div></div></div><p>By default, the <code class="literal">Kafkastreams.cleanup()</code> method is called when the binding is stopped.
See <a class="link" href="https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration" target="_top">the Spring Kafka documentation</a>.
To modify this behavior simply add a single <code class="literal">CleanupConfig</code> <code class="literal">@Bean</code> (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.</p></div></div><div class="navfooter"><hr><table width="100%" summary="Navigation footer"><tr><td width="40%" align="left"><a accesskey="p" href="multi__apache_kafka_binder.html">Prev</a>&nbsp;</td><td width="20%" align="center"><a accesskey="u" href="multi__binder_implementations.html">Up</a></td><td width="40%" align="right">&nbsp;<a accesskey="n" href="multi__rabbitmq_binder.html">Next</a></td></tr><tr><td width="40%" align="left" valign="top">38.&nbsp;Apache Kafka Binder&nbsp;</td><td width="20%" align="center"><a accesskey="h" href="multi_spring-cloud.html">Home</a></td><td width="40%" align="right" valign="top">&nbsp;40.&nbsp;RabbitMQ Binder</td></tr></table></div></body></html>