276 lines
36 KiB
HTML
276 lines
36 KiB
HTML
<html><head>
|
|
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
|
|
<title>40. Apache Kafka Streams Binder</title><link rel="stylesheet" type="text/css" href="css/manual-multipage.css"><meta name="generator" content="DocBook XSL Stylesheets V1.79.1"><link rel="home" href="multi_spring-cloud.html" title="Spring Cloud"><link rel="up" href="multi__binder_implementations.html" title="Part VI. Binder Implementations"><link rel="prev" href="multi__apache_kafka_binder.html" title="39. Apache Kafka Binder"><link rel="next" href="multi__rabbitmq_binder.html" title="41. RabbitMQ Binder"></head><body bgcolor="white" text="black" link="#0000FF" vlink="#840084" alink="#0000FF"><div class="navheader"><table width="100%" summary="Navigation header"><tr><th colspan="3" align="center">40. Apache Kafka Streams Binder</th></tr><tr><td width="20%" align="left"><a accesskey="p" href="multi__apache_kafka_binder.html">Prev</a> </td><th width="60%" align="center">Part VI. Binder Implementations</th><td width="20%" align="right"> <a accesskey="n" href="multi__rabbitmq_binder.html">Next</a></td></tr></table><hr></div><div class="chapter"><div class="titlepage"><div><div><h2 class="title"><a name="_apache_kafka_streams_binder" href="#_apache_kafka_streams_binder"></a>40. Apache Kafka Streams Binder</h2></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_usage_2" href="#_usage_2"></a>40.1 Usage</h2></div></div></div><p>For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following
|
|
Maven coordinates:</p><pre class="programlisting"><span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag"><dependency></span>
|
|
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag"><groupId></span>org.springframework.cloud<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag"></groupId></span>
|
|
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag"><artifactId></span>spring-cloud-stream-binder-kafka-streams<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag"></artifactId></span>
|
|
<span xmlns:d="http://docbook.org/ns/docbook" class="hl-tag"></dependency></span></pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_kafka_streams_binder_overview" href="#_kafka_streams_binder_overview"></a>40.2 Kafka Streams Binder Overview</h2></div></div></div><p>Spring Cloud Stream’s Apache Kafka support also includes a binder implementation designed explicitly for Apache Kafka
|
|
Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the
|
|
<a class="link" href="https://kafka.apache.org/documentation/streams/developer-guide" target="_top">Apache Kafka Streams</a> APIs in the core business logic.</p><p>Kafka Streams binder implementation builds on the foundation provided by the <a class="link" href="http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams" target="_top">Kafka Streams in Spring Kafka</a>
|
|
project.</p><p>Kafka Streams binder provides binding capabilities for the three major types in Kafka Streams - KStream, KTable and GlobalKTable.</p><p>As part of this native integration, the high-level <a class="link" href="https://docs.confluent.io/current/streams/developer-guide/dsl-api.html" target="_top">Streams DSL</a>
|
|
provided by the Kafka Streams API is available for use in the business logic.</p><p>An early version of the <a class="link" href="https://docs.confluent.io/current/streams/developer-guide/processor-api.html" target="_top">Processor API</a>
|
|
support is available as well.</p><p>As noted early-on, Kafka Streams support in Spring Cloud Stream is strictly only available for use in the Processor model.
|
|
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
|
|
can be written to an outbound topic. It can also be used in Processor applications with a no-outbound destination.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_streams_dsl" href="#_streams_dsl"></a>40.2.1 Streams DSL</h3></div></div></div><p>This application consumes data from a Kafka topic (e.g., <code class="literal">words</code>), computes word count for each unique word in a 5 seconds
|
|
time window, and the computed results are sent to a downstream topic (e.g., <code class="literal">counts</code>) for further processing.</p><pre class="screen">@SpringBootApplication
|
|
@EnableBinding(KStreamProcessor.class)
|
|
public class WordCountProcessorApplication {
|
|
|
|
@StreamListener("input")
|
|
@SendTo("output")
|
|
public KStream<?, WordCount> process(KStream<?, String> input) {
|
|
return input
|
|
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
|
.groupBy((key, value) -> value)
|
|
.windowedBy(TimeWindows.of(5000))
|
|
.count(Materialized.as("WordCounts-multi"))
|
|
.toStream()
|
|
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
|
|
}
|
|
|
|
public static void main(String[] args) {
|
|
SpringApplication.run(WordCountProcessorApplication.class, args);
|
|
}</pre><p>Once built as a uber-jar (e.g., <code class="literal">wordcount-processor.jar</code>), you can run the above example like the following.</p><pre class="screen">java -jar wordcount-processor.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts</pre><p>This application will consume messages from the Kafka topic <code class="literal">words</code> and the computed results are published to an output
|
|
topic <code class="literal">counts</code>.</p><p>Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
|
|
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
|
|
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
|
|
is automatically handled by the framework.</p></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_configuration_options_3" href="#_configuration_options_3"></a>40.3 Configuration Options</h2></div></div></div><p>This section contains the configuration options used by the Kafka Streams binder.</p><p>For common configuration options and properties pertaining to binder, refer to the <a class="link" href="multi__configuration_options.html#binding-properties" title="31.2 Binding Properties">core documentation</a>.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_kafka_streams_properties" href="#_kafka_streams_properties"></a>40.3.1 Kafka Streams Properties</h3></div></div></div><p>The following properties are available at the binder level and must be prefixed with <code class="literal">spring.cloud.stream.kafka.streams.binder.</code>
|
|
literal.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">configuration</span></dt><dd> Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
|
|
This property must be prefixed with <code class="literal">spring.cloud.stream.kafka.streams.binder.</code>.
|
|
Following are some examples of using this property.</dd></dl></div><pre class="screen">spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
|
|
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
|
|
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000</pre><p>For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in
|
|
Apache Kafka Streams docs.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">brokers</span></dt><dd><p class="simpara">Broker URL</p><p class="simpara">Default: <code class="literal">localhost</code></p></dd><dt><span class="term">zkNodes</span></dt><dd><p class="simpara">Zookeeper URL</p><p class="simpara">Default: <code class="literal">localhost</code></p></dd><dt><span class="term">serdeError</span></dt><dd><p class="simpara">Deserialization error handler type.
|
|
Possible values are - <code class="literal">logAndContinue</code>, <code class="literal">logAndFail</code> or <code class="literal">sendToDlq</code></p><p class="simpara">Default: <code class="literal">logAndFail</code></p></dd><dt><span class="term">applicationId</span></dt><dd><p class="simpara">Convenient way to set the application.id for the Kafka Streams application globally at the binder level.
|
|
If the application contains multiple <code class="literal">StreamListener</code> methods, then application.id should be set at the binding level per input binding.</p><p class="simpara">Default: <code class="literal">none</code></p></dd></dl></div><p>The following properties are <span class="emphasis"><em>only</em></span> available for Kafka Streams producers and must be prefixed with <code class="literal">spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.</code> literal.
|
|
For convenience, if there multiple output bindings and they all require a common value, that can be configured by using the prefix <code class="literal">spring.cloud.stream.kafka.streams.default.producer.</code>.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">keySerde</span></dt><dd><p class="simpara">key serde to use</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">valueSerde</span></dt><dd><p class="simpara">value serde to use</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">useNativeEncoding</span></dt><dd><p class="simpara">flag to enable native encoding</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd></dl></div><p>The following properties are <span class="emphasis"><em>only</em></span> available for Kafka Streams consumers and must be prefixed with <code class="literal">spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`literal.
|
|
For convenience, if there multiple input bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.consumer.</code>.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">applicationId</span></dt><dd><p class="simpara">Setting application.id per input binding.</p><p class="simpara">Default: <code class="literal">none</code></p></dd><dt><span class="term">keySerde</span></dt><dd><p class="simpara">key serde to use</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">valueSerde</span></dt><dd><p class="simpara">value serde to use</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">materializedAs</span></dt><dd><p class="simpara">state store to materialize when using incoming KTable types</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">useNativeDecoding</span></dt><dd><p class="simpara">flag to enable native decoding</p><p class="simpara">Default: <code class="literal">false</code>.</p></dd><dt><span class="term">dlqName</span></dt><dd><p class="simpara">DLQ topic name.</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd></dl></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_timewindow_properties" href="#_timewindow_properties"></a>40.3.2 TimeWindow properties:</h3></div></div></div><p>Windowing is an important concept in stream processing applications. Following properties are available to configure
|
|
time-window computations.</p><div class="variablelist"><dl class="variablelist"><dt><span class="term">spring.cloud.stream.kafka.streams.timeWindow.length</span></dt><dd><p class="simpara">When this property is given, you can autowire a <code class="literal">TimeWindows</code> bean into the application.
|
|
The value is expressed in milliseconds.</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd><dt><span class="term">spring.cloud.stream.kafka.streams.timeWindow.advanceBy</span></dt><dd><p class="simpara">Value is given in milliseconds.</p><p class="simpara">Default: <code class="literal">none</code>.</p></dd></dl></div></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_multiple_input_bindings" href="#_multiple_input_bindings"></a>40.4 Multiple Input Bindings</h2></div></div></div><p>For use cases that requires multiple incoming KStream objects or a combination of KStream and KTable objects, the Kafka
|
|
Streams binder provides multiple bindings support.</p><p>Let’s see it in action.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_multiple_input_bindings_as_a_sink" href="#_multiple_input_bindings_as_a_sink"></a>40.4.1 Multiple Input Bindings as a Sink</h3></div></div></div><pre class="screen">@EnableBinding(KStreamKTableBinding.class)
|
|
.....
|
|
.....
|
|
@StreamListener
|
|
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
|
|
@Input("inputTable") KTable<Long, Song> songTable) {
|
|
....
|
|
....
|
|
}
|
|
|
|
interface KStreamKTableBinding {
|
|
|
|
@Input("inputStream")
|
|
KStream<?, ?> inputStream();
|
|
|
|
@Input("inputTable")
|
|
KTable<?, ?> inputTable();
|
|
}</pre><p>In the above example, the application is written as a sink, i.e. there are no output bindings and the application has to
|
|
decide concerning downstream processing. When you write applications in this style, you might want to send the information
|
|
downstream or store them in a state store (See below for Queryable State Stores).</p><p>In the case of incoming KTable, if you want to materialize the computations to a state store, you have to express it
|
|
through the following property.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.inputTable.consumer.materializedAs: all-songs</pre><p>The above example shows the use of KTable as an input binding.
|
|
The binder also supports input bindings for GlobalKTable.
|
|
GlobalKTable binding is useful when you have to ensure that all instances of your application has access to the data updates from the topic.
|
|
KTable and GlobalKTable bindings are only available on the input.
|
|
Binder supports both input and output bindings for KStream.</p></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_multiple_input_bindings_as_a_processor" href="#_multiple_input_bindings_as_a_processor"></a>40.4.2 Multiple Input Bindings as a Processor</h3></div></div></div><pre class="screen">@EnableBinding(KStreamKTableBinding.class)
|
|
....
|
|
....
|
|
|
|
@StreamListener
|
|
@SendTo("output")
|
|
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
|
|
@Input("inputTable") KTable<String, String> userRegionsTable) {
|
|
....
|
|
....
|
|
}
|
|
|
|
interface KStreamKTableBinding extends KafkaStreamsProcessor {
|
|
|
|
@Input("inputX")
|
|
KTable<?, ?> inputTable();
|
|
}</pre></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_multiple_output_bindings_aka_branching" href="#_multiple_output_bindings_aka_branching"></a>40.5 Multiple Output Bindings (aka Branching)</h2></div></div></div><p>Kafka Streams allow outbound data to be split into multiple topics based on some predicates. The Kafka Streams binder provides
|
|
support for this feature without compromising the programming model exposed through <code class="literal">StreamListener</code> in the end user application.</p><p>You can write the application in the usual way as demonstrated above in the word count example. However, when using the
|
|
branching feature, you are required to do a few things. First, you need to make sure that your return type is <code class="literal">KStream[]</code>
|
|
instead of a regular <code class="literal">KStream</code>. Second, you need to use the <code class="literal">SendTo</code> annotation containing the output bindings in the order
|
|
(see example below). For each of these output bindings, you need to configure destination, content-type etc., complying with
|
|
the standard Spring Cloud Stream expectations.</p><p>Here is an example:</p><pre class="screen">@EnableBinding(KStreamProcessorWithBranches.class)
|
|
@EnableAutoConfiguration
|
|
public static class WordCountProcessorApplication {
|
|
|
|
@Autowired
|
|
private TimeWindows timeWindows;
|
|
|
|
@StreamListener("input")
|
|
@SendTo({"output1","output2","output3})
|
|
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
|
|
|
|
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
|
|
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
|
|
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
|
|
|
|
return input
|
|
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
|
.groupBy((key, value) -> value)
|
|
.windowedBy(timeWindows)
|
|
.count(Materialized.as("WordCounts-1"))
|
|
.toStream()
|
|
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
|
|
.branch(isEnglish, isFrench, isSpanish);
|
|
}
|
|
|
|
interface KStreamProcessorWithBranches {
|
|
|
|
@Input("input")
|
|
KStream<?, ?> input();
|
|
|
|
@Output("output1")
|
|
KStream<?, ?> output1();
|
|
|
|
@Output("output2")
|
|
KStream<?, ?> output2();
|
|
|
|
@Output("output3")
|
|
KStream<?, ?> output3();
|
|
}
|
|
}</pre><p>Properties:</p><pre class="screen">spring.cloud.stream.bindings.output1.contentType: application/json
|
|
spring.cloud.stream.bindings.output2.contentType: application/json
|
|
spring.cloud.stream.bindings.output3.contentType: application/json
|
|
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
|
|
spring.cloud.stream.kafka.streams.binder.configuration:
|
|
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
|
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
|
spring.cloud.stream.bindings.output1:
|
|
destination: foo
|
|
producer:
|
|
headerMode: raw
|
|
spring.cloud.stream.bindings.output2:
|
|
destination: bar
|
|
producer:
|
|
headerMode: raw
|
|
spring.cloud.stream.bindings.output3:
|
|
destination: fox
|
|
producer:
|
|
headerMode: raw
|
|
spring.cloud.stream.bindings.input:
|
|
destination: words
|
|
consumer:
|
|
headerMode: raw</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_message_conversion" href="#_message_conversion"></a>40.6 Message Conversion</h2></div></div></div><p>Similar to message-channel based binder applications, the Kafka Streams binder adapts to the out-of-the-box content-type
|
|
conversions without any compromise.</p><p>It is typical for Kafka Streams operations to know the type of SerDe’s used to transform the key and value correctly.
|
|
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself at
|
|
the inbound and outbound conversions rather than using the content-type conversions offered by the framework.
|
|
On the other hand, you might be already familiar with the content-type conversion patterns provided by the framework, and
|
|
that, you’d like to continue using for inbound and outbound conversions.</p><p>Both the options are supported in the Kafka Streams binder implementation.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_outbound_serialization" href="#_outbound_serialization"></a>40.6.1 Outbound serialization</h3></div></div></div><p>If native encoding is disabled (which is the default), then the framework will convert the message using the contentType
|
|
set by the user (otherwise, the default <code class="literal">application/json</code> will be applied). It will ignore any SerDe set on the outbound
|
|
in this case for outbound serialization.</p><p>Here is the property to set the contentType on the outbound.</p><pre class="screen">spring.cloud.stream.bindings.output.contentType: application/json</pre><p>Here is the property to enable native encoding.</p><pre class="screen">spring.cloud.stream.bindings.output.nativeEncoding: true</pre><p>If native encoding is enabled on the output binding (user has to enable it as above explicitly), then the framework will
|
|
skip any form of automatic message conversion on the outbound. In that case, it will switch to the Serde set by the user.
|
|
The <code class="literal">valueSerde</code> property set on the actual output binding will be used. Here is an example.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde</pre><p>If this property is not set, then it will use the "default" SerDe: <code class="literal">spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde</code>.</p><p>It is worth to mention that Kafka Streams binder does not serialize the keys on outbound - it simply relies on Kafka itself.
|
|
Therefore, you either have to specify the <code class="literal">keySerde</code> property on the binding or it will default to the application-wide common
|
|
<code class="literal">keySerde</code>.</p><p>Binding level key serde:</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde</pre><p>Common Key serde:</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde</pre><p>If branching is used, then you need to use multiple output bindings. For example,</p><pre class="screen">interface KStreamProcessorWithBranches {
|
|
|
|
@Input("input")
|
|
KStream<?, ?> input();
|
|
|
|
@Output("output1")
|
|
KStream<?, ?> output1();
|
|
|
|
@Output("output2")
|
|
KStream<?, ?> output2();
|
|
|
|
@Output("output3")
|
|
KStream<?, ?> output3();
|
|
}</pre><p>If <code class="literal">nativeEncoding</code> is set, then you can set different SerDe’s on individual output bindings as below.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
|
|
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
|
|
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde</pre><p>Then if you have <code class="literal">SendTo</code> like this, @SendTo({"output1", "output2", "output3"}), the <code class="literal">KStream[]</code> from the branches are
|
|
applied with proper SerDe objects as defined above. If you are not enabling <code class="literal">nativeEncoding</code>, you can then set different
|
|
contentType values on the output bindings as below. In that case, the framework will use the appropriate message converter
|
|
to convert the messages before sending to Kafka.</p><pre class="screen">spring.cloud.stream.bindings.output1.contentType: application/json
|
|
spring.cloud.stream.bindings.output2.contentType: application/java-serialzied-object
|
|
spring.cloud.stream.bindings.output3.contentType: application/octet-stream</pre></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_inbound_deserialization" href="#_inbound_deserialization"></a>40.6.2 Inbound Deserialization</h3></div></div></div><p>Similar rules apply to data deserialization on the inbound.</p><p>If native decoding is disabled (which is the default), then the framework will convert the message using the contentType
|
|
set by the user (otherwise, the default <code class="literal">application/json</code> will be applied). It will ignore any SerDe set on the inbound
|
|
in this case for inbound deserialization.</p><p>Here is the property to set the contentType on the inbound.</p><pre class="screen">spring.cloud.stream.bindings.input.contentType: application/json</pre><p>Here is the property to enable native decoding.</p><pre class="screen">spring.cloud.stream.bindings.input.nativeDecoding: true</pre><p>If native decoding is enabled on the input binding (user has to enable it as above explicitly), then the framework will
|
|
skip doing any message conversion on the inbound. In that case, it will switch to the SerDe set by the user. The <code class="literal">valueSerde</code>
|
|
property set on the actual output binding will be used. Here is an example.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde</pre><p>If this property is not set, it will use the default SerDe: <code class="literal">spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde</code>.</p><p>It is worth to mention that Kafka Streams binder does not deserialize the keys on inbound - it simply relies on Kafka itself.
|
|
Therefore, you either have to specify the <code class="literal">keySerde</code> property on the binding or it will default to the application-wide common
|
|
<code class="literal">keySerde</code>.</p><p>Binding level key serde:</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde</pre><p>Common Key serde:</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde</pre><p>As in the case of KStream branching on the outbound, the benefit of setting value SerDe per binding is that if you have
|
|
multiple input bindings (multiple KStreams object) and they all require separate value SerDe’s, then you can configure
|
|
them individually. If you use the common configuration approach, then this feature won’t be applicable.</p></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_error_handling" href="#_error_handling"></a>40.7 Error Handling</h2></div></div></div><p>Apache Kafka Streams provide the capability for natively handling exceptions from deserialization errors.
|
|
For details on this support, please see <a class="link" href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers" target="_top">this</a>
|
|
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - <code class="literal">logAndContinue</code> and <code class="literal">logAndFail</code>.
|
|
As the name indicates, the former will log the error and continue processing the next records and the latter will log the
|
|
error and fail. <code class="literal">LogAndFail</code> is the default deserialization exception handler.</p><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_handling_deserialization_exceptions" href="#_handling_deserialization_exceptions"></a>40.7.1 Handling Deserialization Exceptions</h3></div></div></div><p>Kafka Streams binder supports a selection of exception handlers through the following properties.</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue</pre><p>In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous
|
|
records (poison pills) to a DLQ topic. Here is how you enable this DLQ exception handler.</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq</pre><p>When the above property is set, all the deserialization error records are automatically sent to the DLQ topic.</p><pre class="screen">spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: foo-dlq</pre><p>If this is set, then the error records are sent to the topic <code class="literal">foo-dlq</code>. If this is not set, then it will create a DLQ
|
|
topic with the name <code class="literal">error.<input-topic-name>.<group-name></code>.</p><p>A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.</p><div class="itemizedlist"><ul class="itemizedlist" style="list-style-type: disc; "><li class="listitem">The property <code class="literal">spring.cloud.stream.kafka.streams.binder.serdeError</code> is applicable for the entire application. This implies
|
|
that if there are multiple <code class="literal">StreamListener</code> methods in the same application, this property is applied to all of them.</li><li class="listitem">The exception handling for deserialization works consistently with native deserialization and framework provided message
|
|
conversion.</li></ul></div></div><div class="section"><div class="titlepage"><div><div><h3 class="title"><a name="_handling_non_deserialization_exceptions" href="#_handling_non_deserialization_exceptions"></a>40.7.2 Handling Non-Deserialization Exceptions</h3></div></div></div><p>For general error handling in Kafka Streams binder, it is up to the end user applications to handle application level errors.
|
|
As a side effect of providing a DLQ for deserialization exception handlers, Kafka Streams binder provides a way to get
|
|
access to the DLQ sending bean directly from your application.
|
|
Once you get access to that bean, you can programmatically send any exception records from your application to the DLQ.</p><p>It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn’t natively support error
|
|
handling yet.</p><p>However, when you use the low-level Processor API in your application, there are options to control this behavior. See
|
|
below.</p><pre class="screen">@Autowired
|
|
private SendToDlqAndContinue dlqHandler;
|
|
|
|
@StreamListener("input")
|
|
@SendTo("output")
|
|
public KStream<?, WordCount> process(KStream<Object, String> input) {
|
|
|
|
input.process(() -> new Processor() {
|
|
ProcessorContext context;
|
|
|
|
@Override
|
|
public void init(ProcessorContext context) {
|
|
this.context = context;
|
|
}
|
|
|
|
@Override
|
|
public void process(Object o, Object o2) {
|
|
|
|
try {
|
|
.....
|
|
.....
|
|
}
|
|
catch(Exception e) {
|
|
//explicitly provide the kafka topic corresponding to the input binding as the first argument.
|
|
//DLQ handler will correctly map to the dlq topic from the actual incoming destination.
|
|
dlqHandler.sendToDlq("topic-name", (byte[]) o1, (byte[]) o2, context.partition());
|
|
}
|
|
}
|
|
|
|
.....
|
|
.....
|
|
});
|
|
}</pre></div></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_state_store" href="#_state_store"></a>40.8 State Store</h2></div></div></div><p>State store is created automatically by Kafka Streams when the DSL is used.
|
|
When processor API is used, you need to register a state store manually. In order to do so, you can use <code class="literal">KafkaStreamsStateStore</code> annotation.
|
|
You can specify the name and type of the store, flags to control log and disabling cache, etc.
|
|
Once the store is created by the binder during the bootstrapping phase, you can access this state store through the processor API.
|
|
Below are some primitives for doing this.</p><p>Creating a state store:</p><pre class="screen">@KafkaStreamsStateStore(name="mystate", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs=300000)
|
|
public void process(KStream<Object, Product> input) {
|
|
...
|
|
}</pre><p>Accessing the state store:</p><pre class="screen">Processor<Object, Product>() {
|
|
|
|
WindowStore<Object, String> state;
|
|
|
|
@Override
|
|
public void init(ProcessorContext processorContext) {
|
|
state = (WindowStore)processorContext.getStateStore("mystate");
|
|
}
|
|
...
|
|
}</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_interactive_queries" href="#_interactive_queries"></a>40.9 Interactive Queries</h2></div></div></div><p>As part of the public Kafka Streams binder API, we expose a class called <code class="literal">InteractiveQueryService</code>.
|
|
You can access this as a Spring bean in your application. An easy way to get access to this bean from your application is to "autowire" the bean.</p><pre class="screen">@Autowired
|
|
private InteractiveQueryService interactiveQueryService;</pre><p>Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.</p><pre class="screen">ReadOnlyKeyValueStore<Object, Object> keyValueStore =
|
|
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());</pre><p>If there are multiple instances of the kafka streams application running, then before you can query them interactively, you need to identify which application instance hosts the key.
|
|
<code class="literal">InteractiveQueryService</code> API provides methods for identifying the host information.</p><p>In order for this to work, you must configure the property <code class="literal">application.server</code> as below:</p><pre class="screen">spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port></pre><p>Here are some code snippets:</p><pre class="screen">org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
|
|
key, keySerializer);
|
|
|
|
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
|
|
|
|
//query from the store that is locally available
|
|
}
|
|
else {
|
|
//query from the remote host
|
|
}</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_accessing_the_underlying_kafkastreams_object" href="#_accessing_the_underlying_kafkastreams_object"></a>40.10 Accessing the underlying KafkaStreams object</h2></div></div></div><p><code class="literal">StreamBuilderFactoryBean</code> from spring-kafka that is responsible for constructing the <code class="literal">KafkaStreams</code> object can be accessed programmatically.
|
|
Each <code class="literal">StreamBuilderFactoryBean</code> is registered as <code class="literal">stream-builder</code> and appended with the <code class="literal">StreamListener</code> method name.
|
|
If your <code class="literal">StreamListener</code> method is named as <code class="literal">process</code> for example, the stream builder bean is named as <code class="literal">stream-builder-process</code>.
|
|
Since this is a factory bean, it should be accessed by prepending an ampersand (<code class="literal">&</code>) when accessing it programmatically.
|
|
Following is an example and it assumes the <code class="literal">StreamListener</code> method is named as <code class="literal">process</code></p><pre class="screen">StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
|
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();</pre></div><div class="section"><div class="titlepage"><div><div><h2 class="title" style="clear: both"><a name="_state_cleanup" href="#_state_cleanup"></a>40.11 State Cleanup</h2></div></div></div><p>By default, the <code class="literal">Kafkastreams.cleanup()</code> method is called when the binding is stopped.
|
|
See <a class="link" href="https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration" target="_top">the Spring Kafka documentation</a>.
|
|
To modify this behavior simply add a single <code class="literal">CleanupConfig</code> <code class="literal">@Bean</code> (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.</p></div></div><div class="navfooter"><hr><table width="100%" summary="Navigation footer"><tr><td width="40%" align="left"><a accesskey="p" href="multi__apache_kafka_binder.html">Prev</a> </td><td width="20%" align="center"><a accesskey="u" href="multi__binder_implementations.html">Up</a></td><td width="40%" align="right"> <a accesskey="n" href="multi__rabbitmq_binder.html">Next</a></td></tr><tr><td width="40%" align="left" valign="top">39. Apache Kafka Binder </td><td width="20%" align="center"><a accesskey="h" href="multi_spring-cloud.html">Home</a></td><td width="40%" align="right" valign="top"> 41. RabbitMQ Binder</td></tr></table></div></body></html> |