Fix docs
@@ -46,6 +46,12 @@
|
||||
<groupId>pl.project13.maven</groupId>
|
||||
<artifactId>git-commit-id-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-gpg-plugin</artifactId>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
</plugin>
|
||||
|
||||
@@ -10,9 +10,12 @@ The reference documentation consists of the following sections:
|
||||
[horizontal]
|
||||
<<spring-cloud-stream.adoc#spring-cloud-stream-reference,Overview>> :: History, Quick Start, Concepts, Architecture Overview, Binder Abstraction, and Core Features
|
||||
|
||||
https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/{project-version}/reference/html/spring-cloud-stream-binder-rabbit.html[Rabbit MQ Binder] :: Spring Cloud Stream binder reference for Rabbit MQ
|
||||
https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/{project-version}/reference/html/spring-cloud-stream-binder-kafka.html#_apache_kafka_binder[Apache Kafka Binder] :: Spring Cloud Stream binder reference for Apache Kafka
|
||||
https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/{project-version}/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder[Apache Kafka Streams Binder] :: Spring Cloud Stream binder reference for Apache Kafka Streams
|
||||
<<spring-cloud-stream-binder-rabbit#spring-cloud-stream-binder-rabbit-reference,Rabbit MQ Binder>> :: Spring Cloud Stream binder reference for Rabbit MQ
|
||||
<<spring-cloud-stream-binder-kafka#_apache_kafka_binder,Apache Kafka Binder>> :: Spring Cloud Stream binder reference for Apache Kafka
|
||||
<<spring-cloud-stream-binder-kafka#_kafka_streams_binder,Apache Kafka Streams binder>> :: Spring Cloud Stream binder reference for Apache Kafka Streams
|
||||
|
||||
|
||||
|
||||
<<binders.adoc#binders,Additional Binders>> :: A collection of Partner maintained binder implementations for Spring Cloud Stream (e.g., Azure Event Hubs, Google PubSub, Solace PubSub+)
|
||||
https://github.com/spring-cloud/spring-cloud-stream-samples/[Spring Cloud Stream Samples] :: A curated collection of repeatable Spring Cloud Stream samples to walk through the features
|
||||
|
||||
|
||||
BIN
docs/src/main/asciidoc/kafka/images/kafka-binder.png
Normal file
|
After Width: | Height: | Size: 9.2 KiB |
BIN
docs/src/main/asciidoc/kafka/images/kafka-streams-initializr.png
Normal file
|
After Width: | Height: | Size: 119 KiB |
|
After Width: | Height: | Size: 233 KiB |
2249
docs/src/main/asciidoc/kafka/kafka-streams.adoc
Normal file
158
docs/src/main/asciidoc/kafka/kafka_dlq.adoc
Normal file
@@ -0,0 +1,158 @@
|
||||
[[kafka-dlq-processing]]
|
||||
=== Dead-Letter Topic Processing
|
||||
|
||||
[[dlq-partition-selection]]
|
||||
==== Dead-Letter Topic Partition Selection
|
||||
|
||||
By default, records are published to the Dead-Letter topic using the same partition as the original record.
|
||||
This means the Dead-Letter topic must have at least as many partitions as the original record.
|
||||
|
||||
To change this behavior, add a `DlqPartitionFunction` implementation as a `@Bean` to the application context.
|
||||
Only one such bean can be present.
|
||||
The function is provided with the consumer group, the failed `ConsumerRecord` and the exception.
|
||||
For example, if you always want to route to partition 0, you might use:
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public DlqPartitionFunction partitionFunction() {
|
||||
return (group, record, ex) -> 0;
|
||||
}
|
||||
----
|
||||
====
|
||||
NOTE: If you set a consumer binding's `dlqPartitions` property to 1 (and the binder's `minPartitionCount` is equal to `1`), there is no need to supply a `DlqPartitionFunction`; the framework will always use partition 0.
|
||||
If you set a consumer binding's `dlqPartitions` property to a value greater than `1` (or the binder's `minPartitionCount` is greater than `1`), you **must** provide a `DlqPartitionFunction` bean, even if the partition count is the same as the original topic's.
|
||||
|
||||
It is also possible to define a custom name for the DLQ topic.
|
||||
In order to do so, create an implementation of `DlqDestinationResolver` as a `@Bean` to the application context.
|
||||
When the binder detects such a bean, that takes precedence, otherwise it will use the `dlqName` property.
|
||||
If neither of these are found, it will default to `error.<destination>.<group>`.
|
||||
Here is an example of `DlqDestinationResolver` as a `@Bean`.
|
||||
|
||||
====
|
||||
[source]
|
||||
----
|
||||
@Bean
|
||||
public DlqDestinationResolver dlqDestinationResolver() {
|
||||
return (rec, ex) -> {
|
||||
if (rec.topic().equals("word1")) {
|
||||
return "topic1-dlq";
|
||||
}
|
||||
else {
|
||||
return "topic2-dlq";
|
||||
}
|
||||
};
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
One important thing to keep in mind when providing an implementation for `DlqDestinationResolver` is that the provisioner in the binder will not auto create topics for the application.
|
||||
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
|
||||
Therefore, if you provide DLQ names using this strategy, it is the application's responsibility to ensure that those topics are created beforehand.
|
||||
|
||||
[[dlq-handling]]
|
||||
==== Handling Records in a Dead-Letter Topic
|
||||
|
||||
Because the framework cannot anticipate how users would want to dispose of dead-lettered messages, it does not provide any standard mechanism to handle them.
|
||||
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic.
|
||||
However, if the problem is a permanent issue, that could cause an infinite loop.
|
||||
The sample Spring Boot application within this topic is an example of how to route those messages back to the original topic, but it moves them to a "`parking lot`" topic after three attempts.
|
||||
The application is another spring-cloud-stream application that reads from the dead-letter topic.
|
||||
It exits when no messages are received for 5 seconds.
|
||||
|
||||
The examples assume the original destination is `so8400out` and the consumer group is `so8400`.
|
||||
|
||||
There are a couple of strategies to consider:
|
||||
|
||||
* Consider running the rerouting only when the main application is not running.
|
||||
Otherwise, the retries for transient errors are used up very quickly.
|
||||
* Alternatively, use a two-stage approach: Use this application to route to a third topic and another to route from there back to the main topic.
|
||||
|
||||
The following code listings show the sample application:
|
||||
|
||||
.application.properties
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.bindings.input.group=so8400replay
|
||||
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
|
||||
|
||||
spring.cloud.stream.bindings.output.destination=so8400out
|
||||
|
||||
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
|
||||
|
||||
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
|
||||
|
||||
spring.cloud.stream.kafka.binder.headers=x-retries
|
||||
----
|
||||
|
||||
.Application
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(TwoOutputProcessor.class)
|
||||
public class ReRouteDlqKApplication implements CommandLineRunner {
|
||||
|
||||
private static final String X_RETRIES_HEADER = "x-retries";
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
|
||||
}
|
||||
|
||||
private final AtomicInteger processed = new AtomicInteger();
|
||||
|
||||
@Autowired
|
||||
private MessageChannel parkingLot;
|
||||
|
||||
@StreamListener(Processor.INPUT)
|
||||
@SendTo(Processor.OUTPUT)
|
||||
public Message<?> reRoute(Message<?> failed) {
|
||||
processed.incrementAndGet();
|
||||
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
|
||||
if (retries == null) {
|
||||
System.out.println("First retry for " + failed);
|
||||
return MessageBuilder.fromMessage(failed)
|
||||
.setHeader(X_RETRIES_HEADER, new Integer(1))
|
||||
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
|
||||
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
|
||||
.build();
|
||||
}
|
||||
else if (retries.intValue() < 3) {
|
||||
System.out.println("Another retry for " + failed);
|
||||
return MessageBuilder.fromMessage(failed)
|
||||
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
|
||||
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
|
||||
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
|
||||
.build();
|
||||
}
|
||||
else {
|
||||
System.out.println("Retries exhausted for " + failed);
|
||||
parkingLot.send(MessageBuilder.fromMessage(failed)
|
||||
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
|
||||
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
|
||||
.build());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
while (true) {
|
||||
int count = this.processed.get();
|
||||
Thread.sleep(5000);
|
||||
if (count == this.processed.get()) {
|
||||
System.out.println("Idle, exiting");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface TwoOutputProcessor extends Processor {
|
||||
|
||||
@Output("parkingLot")
|
||||
MessageChannel parkingLot();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
966
docs/src/main/asciidoc/kafka/kafka_overview.adoc
Normal file
@@ -0,0 +1,966 @@
|
||||
[partintro]
|
||||
--
|
||||
This guide describes the Apache Kafka implementation of the Spring Cloud Stream Binder.
|
||||
It contains information about its design, usage, and configuration options, as well as information on how the Stream Cloud Stream concepts map onto Apache Kafka specific constructs.
|
||||
In addition, this guide explains the Kafka Streams binding capabilities of Spring Cloud Stream.
|
||||
--
|
||||
|
||||
== Apache Kafka Binder
|
||||
|
||||
=== Usage
|
||||
|
||||
To use Apache Kafka binder, you need to add `spring-cloud-stream-binder-kafka` as a dependency to your Spring Cloud Stream application, as shown in the following example for Maven:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
=== Overview
|
||||
|
||||
The following image shows a simplified diagram of how the Apache Kafka binder operates:
|
||||
|
||||
.Kafka Binder
|
||||
image::{github-raw}/docs/src/main/asciidoc/images/kafka-binder.png[width=300,scaledwidth="50%"]
|
||||
|
||||
The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic.
|
||||
The consumer group maps directly to the same Apache Kafka concept.
|
||||
Partitioning also maps directly to Apache Kafka partitions as well.
|
||||
|
||||
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
|
||||
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
|
||||
For example, with versions earlier than 0.11.x.x, native headers are not supported.
|
||||
Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
||||
|
||||
=== Configuration Options
|
||||
|
||||
This section contains the configuration options used by the Apache Kafka binder.
|
||||
|
||||
For common configuration options and properties pertaining to the binder, see the https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#binding-properties[binding properties] in core documentation.
|
||||
|
||||
==== Kafka Binder Properties
|
||||
|
||||
spring.cloud.stream.kafka.binder.brokers::
|
||||
A list of brokers to which the Kafka binder connects.
|
||||
+
|
||||
Default: `localhost`.
|
||||
spring.cloud.stream.kafka.binder.defaultBrokerPort::
|
||||
`brokers` allows hosts specified with or without port information (for example, `host1,host2:port2`).
|
||||
This sets the default port when no port is configured in the broker list.
|
||||
+
|
||||
Default: `9092`.
|
||||
spring.cloud.stream.kafka.binder.configuration::
|
||||
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder.
|
||||
Due to the fact that these properties are used by both producers and consumers, usage should be restricted to common properties -- for example, security settings.
|
||||
Unknown Kafka producer or consumer properties provided through this configuration are filtered out and not allowed to propagate.
|
||||
Properties here supersede any properties set in boot.
|
||||
+
|
||||
Default: Empty map.
|
||||
spring.cloud.stream.kafka.binder.consumerProperties::
|
||||
Key/Value map of arbitrary Kafka client consumer properties.
|
||||
In addition to support known Kafka consumer properties, unknown consumer properties are allowed here as well.
|
||||
Properties here supersede any properties set in boot and in the `configuration` property above.
|
||||
+
|
||||
Default: Empty map.
|
||||
spring.cloud.stream.kafka.binder.headers::
|
||||
The list of custom headers that are transported by the binder.
|
||||
Only required when communicating with older applications (<= 1.3.x) with a `kafka-clients` version < 0.11.0.0. Newer versions support headers natively.
|
||||
+
|
||||
Default: empty.
|
||||
spring.cloud.stream.kafka.binder.healthTimeout::
|
||||
The time to wait to get partition information, in seconds.
|
||||
Health reports as down if this timer expires.
|
||||
+
|
||||
Default: 10.
|
||||
spring.cloud.stream.kafka.binder.requiredAcks::
|
||||
The number of required acks on the broker.
|
||||
See the Kafka documentation for the producer `acks` property.
|
||||
+
|
||||
Default: `1`.
|
||||
spring.cloud.stream.kafka.binder.minPartitionCount::
|
||||
Effective only if `autoCreateTopics` or `autoAddPartitions` is set.
|
||||
The global minimum number of partitions that the binder configures on topics on which it produces or consumes data.
|
||||
It can be superseded by the `partitionCount` setting of the producer or by the value of `instanceCount * concurrency` settings of the producer (if either is larger).
|
||||
+
|
||||
Default: `1`.
|
||||
spring.cloud.stream.kafka.binder.producerProperties::
|
||||
Key/Value map of arbitrary Kafka client producer properties.
|
||||
In addition to support known Kafka producer properties, unknown producer properties are allowed here as well.
|
||||
Properties here supersede any properties set in boot and in the `configuration` property above.
|
||||
+
|
||||
Default: Empty map.
|
||||
spring.cloud.stream.kafka.binder.replicationFactor::
|
||||
The replication factor of auto-created topics if `autoCreateTopics` is active.
|
||||
Can be overridden on each binding.
|
||||
+
|
||||
NOTE: If you are using Kafka broker versions prior to 2.4, then this value should be set to at least `1`.
|
||||
Starting with version 3.0.8, the binder uses `-1` as the default value, which indicates that the broker 'default.replication.factor' property will be used to determine the number of replicas.
|
||||
Check with your Kafka broker admins to see if there is a policy in place that requires a minimum replication factor, if that's the case then, typically, the `default.replication.factor` will match that value and `-1` should be used, unless you need a replication factor greater than the minimum.
|
||||
+
|
||||
Default: `-1`.
|
||||
spring.cloud.stream.kafka.binder.autoCreateTopics::
|
||||
If set to `true`, the binder creates new topics automatically.
|
||||
If set to `false`, the binder relies on the topics being already configured.
|
||||
In the latter case, if the topics do not exist, the binder fails to start.
|
||||
+
|
||||
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
|
||||
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
|
||||
+
|
||||
Default: `true`.
|
||||
spring.cloud.stream.kafka.binder.autoAddPartitions::
|
||||
If set to `true`, the binder creates new partitions if required.
|
||||
If set to `false`, the binder relies on the partition size of the topic being already configured.
|
||||
If the partition count of the target topic is smaller than the expected value, the binder fails to start.
|
||||
+
|
||||
Default: `false`.
|
||||
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
|
||||
Enables transactions in the binder. See `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
|
||||
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
|
||||
+
|
||||
Default `null` (no transactions)
|
||||
spring.cloud.stream.kafka.binder.transaction.producer.*::
|
||||
Global producer properties for producers in a transactional binder.
|
||||
See `spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix` and <<kafka-producer-properties>> and the general producer properties supported by all binders.
|
||||
+
|
||||
Default: See individual producer properties.
|
||||
|
||||
spring.cloud.stream.kafka.binder.headerMapperBeanName::
|
||||
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to and from Kafka headers.
|
||||
Use this, for example, if you wish to customize the trusted packages in a `BinderHeaderMapper` bean that uses JSON deserialization for the headers.
|
||||
If this custom `BinderHeaderMapper` bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the name `kafkaBinderHeaderMapper` that is of type `BinderHeaderMapper` before falling back to a default `BinderHeaderMapper` created by the binder.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader::
|
||||
Flag to set the binder health as `down`, when any partitions on the topic, regardless of the consumer that is receiving data from it, is found without a leader.
|
||||
+
|
||||
Default: `false`.
|
||||
|
||||
spring.cloud.stream.kafka.binder.certificateStoreDirectory::
|
||||
When the truststore or keystore certificate location is given as a classpath URL (`classpath:...`), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem.
|
||||
This is true for both broker level certificates (`ssl.truststore.location` and `ssl.keystore.location`) and certificates intended for schema registry (`schema.registry.ssl.truststore.location` and `schema.registry.ssl.keystore.location`).
|
||||
Keep in mind that the truststore and keystore classpath locations must be provided under `spring.cloud.stream.kafka.binder.configuration...`.
|
||||
For example, `spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location`, ``spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location`, etc.
|
||||
The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application.
|
||||
If this value is not set and the certificate file is a classpath resource, then it will be moved to System's temp directory as returned by `System.getProperty("java.io.tmpdir")`.
|
||||
This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.consumer.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
admin.configuration::
|
||||
Since version 2.1.1, this property is deprecated in favor of `topic.properties`, and support for it will be removed in a future version.
|
||||
|
||||
admin.replicas-assignment::
|
||||
Since version 2.1.1, this property is deprecated in favor of `topic.replicas-assignment`, and support for it will be removed in a future version.
|
||||
|
||||
admin.replication-factor::
|
||||
Since version 2.1.1, this property is deprecated in favor of `topic.replication-factor`, and support for it will be removed in a future version.
|
||||
|
||||
autoRebalanceEnabled::
|
||||
When `true`, topic partitions is automatically rebalanced between the members of a consumer group.
|
||||
When `false`, each consumer is assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
|
||||
This requires both the `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex` properties to be set appropriately on each launched instance.
|
||||
The value of the `spring.cloud.stream.instanceCount` property must typically be greater than 1 in this case.
|
||||
+
|
||||
Default: `true`.
|
||||
ackEachRecord::
|
||||
When `autoCommitOffset` is `true`, this setting dictates whether to commit the offset after each record is processed.
|
||||
By default, offsets are committed after all records in the batch of records returned by `consumer.poll()` have been processed.
|
||||
The number of records returned by a poll can be controlled with the `max.poll.records` Kafka property, which is set through the consumer `configuration` property.
|
||||
Setting this to `true` may cause a degradation in performance, but doing so reduces the likelihood of redelivered records when a failure occurs.
|
||||
Also, see the binder `requiredAcks` property, which also affects the performance of committing offsets.
|
||||
This property is deprecated as of 3.1 in favor of using `ackMode`.
|
||||
If the `ackMode` is not set and batch mode is not enabled, `RECORD` ackMode will be used.
|
||||
+
|
||||
Default: `false`.
|
||||
|
||||
autoCommitOffset::
|
||||
|
||||
Starting with version 3.1, this property is deprecated.
|
||||
See `ackMode` for more details on alternatives.
|
||||
Whether to autocommit offsets when a message has been processed.
|
||||
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header is present in the inbound message.
|
||||
Applications may use this header for acknowledging messages.
|
||||
See the examples section for details.
|
||||
When this property is set to `false`, Kafka binder sets the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL` and the application is responsible for acknowledging records.
|
||||
Also see `ackEachRecord`.
|
||||
+
|
||||
Default: `true`.
|
||||
ackMode::
|
||||
Specify the container ack mode.
|
||||
This is based on the AckMode enumeration defined in Spring Kafka.
|
||||
If `ackEachRecord` property is set to `true` and consumer is not in batch mode, then this will use the ack mode of `RECORD`, otherwise, use the provided ack mode using this property.
|
||||
|
||||
autoCommitOnError::
|
||||
In pollable consumers, if set to `true`, it always auto commits on error.
|
||||
If not set (the default) or false, it will not auto commit in pollable consumers.
|
||||
Note that this property is only applicable for pollable consumers.
|
||||
+
|
||||
Default: not set.
|
||||
resetOffsets::
|
||||
Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
Must be false if a `KafkaBindingRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: `false`.
|
||||
startOffset::
|
||||
The starting offset for new groups.
|
||||
Allowed values: `earliest` and `latest`.
|
||||
If the consumer group is set explicitly for the consumer 'binding' (through `spring.cloud.stream.bindings.<channelName>.group`), 'startOffset' is set to `earliest`. Otherwise, it is set to `latest` for the `anonymous` consumer group.
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: null (equivalent to `earliest`).
|
||||
enableDlq::
|
||||
When set to true, it enables DLQ behavior for the consumer.
|
||||
By default, messages that result in errors are forwarded to a topic named `error.<destination>.<group>`.
|
||||
The DLQ topic name can be configurable by setting the `dlqName` property or by defining a `@Bean` of type `DlqDestinationResolver`.
|
||||
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
||||
See <<kafka-dlq-processing>> processing for more information.
|
||||
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
|
||||
By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
|
||||
See <<dlq-partition-selection>> for how to change that behavior.
|
||||
**Not allowed when `destinationIsPattern` is `true`.**
|
||||
+
|
||||
Default: `false`.
|
||||
dlqPartitions::
|
||||
When `enableDlq` is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created.
|
||||
Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record.
|
||||
This behavior can be changed; see <<dlq-partition-selection>>.
|
||||
If this property is set to `1` and there is no `DqlPartitionFunction` bean, all dead-letter records will be written to partition `0`.
|
||||
If this property is greater than `1`, you **MUST** provide a `DlqPartitionFunction` bean.
|
||||
Note that the actual partition count is affected by the binder's `minPartitionCount` property.
|
||||
+
|
||||
Default: `none`
|
||||
configuration::
|
||||
Map with a key/value pair containing generic Kafka consumer properties.
|
||||
In addition to having Kafka consumer properties, other configuration properties can be passed here.
|
||||
For example some properties needed by the application such as `spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar`.
|
||||
The `bootstrap.servers` property cannot be set here; use multi-binder support if you need to connect to multiple clusters.
|
||||
+
|
||||
Default: Empty map.
|
||||
dlqName::
|
||||
The name of the DLQ topic to receive the error messages.
|
||||
+
|
||||
Default: null (If not specified, messages that result in errors are forwarded to a topic named `error.<destination>.<group>`).
|
||||
dlqProducerProperties::
|
||||
Using this, DLQ-specific producer properties can be set.
|
||||
All the properties available through kafka producer properties can be set through this property.
|
||||
When native decoding is enabled on the consumer (i.e., useNativeDecoding: true) , the application must provide corresponding key/value serializers for DLQ.
|
||||
This must be provided in the form of `dlqProducerProperties.configuration.key.serializer` and `dlqProducerProperties.configuration.value.serializer`.
|
||||
+
|
||||
Default: Default Kafka producer properties.
|
||||
standardHeaders::
|
||||
Indicates which standard headers are populated by the inbound channel adapter.
|
||||
Allowed values: `none`, `id`, `timestamp`, or `both`.
|
||||
Useful if using native deserialization and the first component to receive a message needs an `id` (such as an aggregator that is configured to use a JDBC message store).
|
||||
+
|
||||
Default: `none`
|
||||
converterBeanName::
|
||||
The name of a bean that implements `RecordMessageConverter`. Used in the inbound channel adapter to replace the default `MessagingMessageConverter`.
|
||||
+
|
||||
Default: `null`
|
||||
idleEventInterval::
|
||||
The interval, in milliseconds, between events indicating that no messages have recently been received.
|
||||
Use an `ApplicationListener<ListenerContainerIdleEvent>` to receive these events.
|
||||
See <<pause-resume>> for a usage example.
|
||||
+
|
||||
Default: `30000`
|
||||
destinationIsPattern::
|
||||
When true, the destination is treated as a regular expression `Pattern` used to match topic names by the broker.
|
||||
When true, topics are not provisioned, and `enableDlq` is not allowed, because the binder does not know the topic names during the provisioning phase.
|
||||
Note, the time taken to detect new topics that match the pattern is controlled by the consumer property `metadata.max.age.ms`, which (at the time of writing) defaults to 300,000ms (5 minutes).
|
||||
This can be configured using the `configuration` property above.
|
||||
+
|
||||
Default: `false`
|
||||
topic.properties::
|
||||
A `Map` of Kafka topic properties used when provisioning new topics -- for example, `spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0`
|
||||
+
|
||||
Default: none.
|
||||
topic.replicas-assignment::
|
||||
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments.
|
||||
Used when provisioning new topics.
|
||||
See the `NewTopic` Javadocs in the `kafka-clients` jar.
|
||||
+
|
||||
Default: none.
|
||||
topic.replication-factor::
|
||||
The replication factor to use when provisioning topics. Overrides the binder-wide setting.
|
||||
Ignored if `replicas-assignments` is present.
|
||||
+
|
||||
Default: none (the binder-wide default of -1 is used).
|
||||
pollTimeout::
|
||||
Timeout used for polling in pollable consumers.
|
||||
+
|
||||
Default: 5 seconds.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
txCommitRecovered::
|
||||
When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default.
|
||||
Setting this property to `false` suppresses committing the offset of recovered record.
|
||||
+
|
||||
Default: true.
|
||||
commonErrorHandlerBeanName::
|
||||
`CommonErrorHandler` bean name to use per consumer binding.
|
||||
When present, this user provided `CommonErrorHandler` takes precedence over any other error handlers defined by the binder.
|
||||
This is a handy way to express error handlers, if the application does not want to use a `ListenerContainerCustomizer` and then check the destination/group combination to set an error handler.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[reset-offsets]]
|
||||
==== Resetting Offsets
|
||||
|
||||
When an application starts, the initial position in each assigned partition depends on two properties `startOffset` and `resetOffsets`.
|
||||
If `resetOffsets` is `false`, normal Kafka consumer https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset[`auto.offset.reset`] semantics apply.
|
||||
i.e. If there is no committed offset for a partition for the binding's consumer group, the position is `earliest` or `latest`.
|
||||
By default, bindings with an explicit `group` use `earliest`, and anonymous bindings (with no `group`) use `latest`.
|
||||
These defaults can be overridden by setting the `startOffset` binding property.
|
||||
There will be no committed offset(s) the first time the binding is started with a particular `group`.
|
||||
The other condition where no committed offset exists is if the offset has been expired.
|
||||
With modern brokers (since 2.1), and default broker properties, the offsets are expired 7 days after the last member leaves the group.
|
||||
See the https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes[`offsets.retention.minutes`] broker property for more information.
|
||||
|
||||
When `resetOffsets` is `true`, the binder applies similar semantics to those that apply when there is no committed offset on the broker, as if this binding has never consumed from the topic; i.e. any current committed offset is ignored.
|
||||
|
||||
Following are two use cases when this might be used.
|
||||
|
||||
1. Consuming from a compacted topic containing key/value pairs.
|
||||
Set `resetOffsets` to `true` and `startOffset` to `earliest`; the binding will perform a `seekToBeginning` on all newly assigned partitions.
|
||||
|
||||
2. Consuming from a topic containing events, where you are only interested in events that occur while this binding is running.
|
||||
Set `resetOffsets` to `true` and `startOffset` to `latest`; the binding will perform a `seekToEnd` on all newly assigned partitions.
|
||||
|
||||
IMPORTANT: If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment.
|
||||
|
||||
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets` should not be set to `true`, otherwise, that will cause an error.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
Starting with version 3.0, when `spring.cloud.stream.binding.<name>.consumer.batch-mode` is set to `true`, all of the records received by polling the Kafka `Consumer` will be presented as a `List<?>` to the listener method.
|
||||
Otherwise, the method will be called with one record at a time.
|
||||
The size of the batch is controlled by Kafka consumer properties `max.poll.records`, `fetch.min.bytes`, `fetch.max.wait.ms`; refer to the Kafka documentation for more information.
|
||||
|
||||
Bear in mind that batch mode is not supported with `@StreamListener` - it only works with the newer functional programming model.
|
||||
|
||||
IMPORTANT: Retry within the binder is not supported when using batch mode, so `maxAttempts` will be overridden to 1.
|
||||
You can configure a `SeekToCurrentBatchErrorHandler` (using a `ListenerContainerCustomizer`) to achieve similar functionality to retry in the binder.
|
||||
You can also use a manual `AckMode` and call `Ackowledgment.nack(index, sleep)` to commit the offsets for a partial batch and have the remaining records redelivered.
|
||||
Refer to the https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/reference/html/#committing-offsets[Spring for Apache Kafka documentation] for more information about these techniques.
|
||||
|
||||
[[kafka-producer-properties]]
|
||||
==== Kafka Producer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.producer.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka producers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
||||
|
||||
admin.configuration::
|
||||
Since version 2.1.1, this property is deprecated in favor of `topic.properties`, and support for it will be removed in a future version.
|
||||
|
||||
admin.replicas-assignment::
|
||||
Since version 2.1.1, this property is deprecated in favor of `topic.replicas-assignment`, and support for it will be removed in a future version.
|
||||
|
||||
admin.replication-factor::
|
||||
Since version 2.1.1, this property is deprecated in favor of `topic.replication-factor`, and support for it will be removed in a future version.
|
||||
|
||||
bufferSize::
|
||||
Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.
|
||||
+
|
||||
Default: `16384`.
|
||||
sync::
|
||||
Whether the producer is synchronous.
|
||||
+
|
||||
Default: `false`.
|
||||
sendTimeoutExpression::
|
||||
A SpEL expression evaluated against the outgoing message used to evaluate the time to wait for ack when synchronous publish is enabled -- for example, `headers['mySendTimeout']`.
|
||||
The value of the timeout is in milliseconds.
|
||||
With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a `byte[]`.
|
||||
Now, the expression is evaluated before the payload is converted.
|
||||
+
|
||||
Default: `none`.
|
||||
batchTimeout::
|
||||
How long the producer waits to allow more messages to accumulate in the same batch before sending the messages.
|
||||
(Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
|
||||
+
|
||||
Default: `0`.
|
||||
messageKeyExpression::
|
||||
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message -- for example, `headers['myKey']`.
|
||||
With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a `byte[]`.
|
||||
Now, the expression is evaluated before the payload is converted.
|
||||
In the case of a regular processor (`Function<String, String>` or `Function<Message<?>, Message<?>`), if the produced key needs to be same as the incoming key from the topic, this property can be set as below.
|
||||
`spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']`
|
||||
There is an important caveat to keep in mind for reactive functions.
|
||||
In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages.
|
||||
You can set the header, e.g. `myKey` and use `headers['myKey']` as suggested above or, for convenience, simply set the `KafkaHeaders.MESSAGE_KEY` header, and you do not need to set this property at all.
|
||||
+
|
||||
Default: `none`.
|
||||
headerPatterns::
|
||||
A comma-delimited list of simple patterns to match Spring messaging headers to be mapped to the Kafka `Headers` in the `ProducerRecord`.
|
||||
Patterns can begin or end with the wildcard character (asterisk).
|
||||
Patterns can be negated by prefixing with `!`.
|
||||
Matching stops after the first match (positive or negative).
|
||||
For example `!ask,as*` will pass `ash` but not `ask`.
|
||||
`id` and `timestamp` are never mapped.
|
||||
+
|
||||
Default: `*` (all headers - except the `id` and `timestamp`)
|
||||
configuration::
|
||||
Map with a key/value pair containing generic Kafka producer properties.
|
||||
The `bootstrap.servers` property cannot be set here; use multi-binder support if you need to connect to multiple clusters.
|
||||
+
|
||||
Default: Empty map.
|
||||
topic.properties::
|
||||
A `Map` of Kafka topic properties used when provisioning new topics -- for example, `spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0`
|
||||
+
|
||||
topic.replicas-assignment::
|
||||
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments.
|
||||
Used when provisioning new topics.
|
||||
See the `NewTopic` Javadocs in the `kafka-clients` jar.
|
||||
+
|
||||
Default: none.
|
||||
topic.replication-factor::
|
||||
The replication factor to use when provisioning topics. Overrides the binder-wide setting.
|
||||
Ignored if `replicas-assignments` is present.
|
||||
+
|
||||
Default: none (the binder-wide default of -1 is used).
|
||||
useTopicHeader::
|
||||
Set to `true` to override the default binding destination (topic name) with the value of the `KafkaHeaders.TOPIC` message header in the outbound message.
|
||||
If the header is not present, the default binding destination is used.
|
||||
+
|
||||
Default: `false`.
|
||||
recordMetadataChannel::
|
||||
The bean name of a `MessageChannel` to which successful send results should be sent; the bean must exist in the application context.
|
||||
The message sent to the channel is the sent message (after conversion, if any) with an additional header `KafkaHeaders.RECORD_METADATA`.
|
||||
The header contains a `RecordMetadata` object provided by the Kafka client; it includes the partition and offset where the record was written in the topic.
|
||||
+
|
||||
`ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)`
|
||||
+
|
||||
Failed sends go the producer error channel (if configured); see <<kafka-error-channels>>.
|
||||
+
|
||||
Default: null.
|
||||
|
||||
NOTE: The Kafka binder uses the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used).
|
||||
Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value is used.
|
||||
If a topic already exists with a smaller partition count and `autoAddPartitions` is disabled (the default), the binder fails to start.
|
||||
If a topic already exists with a smaller partition count and `autoAddPartitions` is enabled, new partitions are added.
|
||||
If a topic already exists with a larger number of partitions than the maximum of (`minPartitionCount` or `partitionCount`), the existing partition count is used.
|
||||
|
||||
compression::
|
||||
Set the `compression.type` producer property.
|
||||
Supported values are `none`, `gzip`, `snappy`, `lz4` and `zstd`.
|
||||
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
|
||||
+
|
||||
Default: `none`.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
closeTimeout::
|
||||
Timeout in number of seconds to wait for when closing the producer.
|
||||
+
|
||||
Default: `30`
|
||||
|
||||
allowNonTransactional::
|
||||
Normally, all output bindings associated with a transactional binder will publish in a new transaction, if one is not already in process.
|
||||
This property allows you to override that behavior.
|
||||
If set to true, records published to this output binding will not be run in a transaction, unless one is already in process.
|
||||
+
|
||||
Default: `false`
|
||||
|
||||
==== Usage examples
|
||||
|
||||
In this section, we show the use of the preceding properties for specific scenarios.
|
||||
|
||||
===== Example: Setting `ackMode` to `MANUAL` and Relying on Manual Acknowledgement
|
||||
|
||||
This example illustrates how one may manually acknowledge offsets in a consumer application.
|
||||
|
||||
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` be set to `MANUAL`.
|
||||
Use the corresponding input channel name for your example.
|
||||
|
||||
[source]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
public class ManuallyAcknowdledgingConsumer {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
|
||||
}
|
||||
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void process(Message<?> message) {
|
||||
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
|
||||
if (acknowledgment != null) {
|
||||
System.out.println("Acknowledgment provided");
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
===== Example: Security Configuration
|
||||
|
||||
Apache Kafka 0.9 supports secure connections between client and brokers.
|
||||
To take advantage of this feature, follow the guidelines in the https://kafka.apache.org/090/documentation.html#security_configclients[Apache Kafka Documentation] as well as the Kafka 0.9 https://docs.confluent.io/2.0.0/kafka/security.html[security guidelines from the Confluent documentation].
|
||||
Use the `spring.cloud.stream.kafka.binder.configuration` option to set security properties for all clients created by the binder.
|
||||
|
||||
For example, to set `security.protocol` to `SASL_SSL`, set the following property:
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
|
||||
----
|
||||
|
||||
All the other security properties can be set in a similar manner.
|
||||
|
||||
When using Kerberos, follow the instructions in the https://kafka.apache.org/090/documentation.html#security_sasl_clientconfig[reference documentation] for creating and referencing the JAAS configuration.
|
||||
|
||||
Spring Cloud Stream supports passing JAAS configuration information to the application by using a JAAS configuration file and using Spring Boot properties.
|
||||
|
||||
====== Using JAAS Configuration Files
|
||||
|
||||
The JAAS and (optionally) krb5 file locations can be set for Spring Cloud Stream applications by using system properties.
|
||||
The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using a JAAS configuration file:
|
||||
|
||||
[source,bash]
|
||||
----
|
||||
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
|
||||
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
||||
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
|
||||
----
|
||||
|
||||
====== Using Spring Boot Properties
|
||||
|
||||
As an alternative to having a JAAS configuration file, Spring Cloud Stream provides a mechanism for setting up the JAAS configuration for Spring Cloud Stream applications by using Spring Boot properties.
|
||||
|
||||
The following properties can be used to configure the login context of the Kafka client:
|
||||
|
||||
spring.cloud.stream.kafka.binder.jaas.loginModule::
|
||||
The login module name. Not necessary to be set in normal cases.
|
||||
+
|
||||
Default: `com.sun.security.auth.module.Krb5LoginModule`.
|
||||
spring.cloud.stream.kafka.binder.jaas.controlFlag::
|
||||
The control flag of the login module.
|
||||
+
|
||||
Default: `required`.
|
||||
spring.cloud.stream.kafka.binder.jaas.options::
|
||||
Map with a key/value pair containing the login module options.
|
||||
+
|
||||
Default: Empty map.
|
||||
|
||||
The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using Spring Boot configuration properties:
|
||||
|
||||
[source,bash]
|
||||
----
|
||||
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
||||
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
|
||||
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
|
||||
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
|
||||
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
|
||||
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
|
||||
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
|
||||
----
|
||||
|
||||
The preceding example represents the equivalent of the following JAAS file:
|
||||
|
||||
[source]
|
||||
----
|
||||
KafkaClient {
|
||||
com.sun.security.auth.module.Krb5LoginModule required
|
||||
useKeyTab=true
|
||||
storeKey=true
|
||||
keyTab="/etc/security/keytabs/kafka_client.keytab"
|
||||
principal="kafka-client-1@EXAMPLE.COM";
|
||||
};
|
||||
----
|
||||
|
||||
If the topics required already exist on the broker or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
|
||||
|
||||
NOTE: Do not mix JAAS configuration files and Spring Boot properties in the same application.
|
||||
If the `-Djava.security.auth.login.config` system property is already present, Spring Cloud Stream ignores the Spring Boot properties.
|
||||
|
||||
NOTE: Be careful when using the `autoCreateTopics` and `autoAddPartitions` with Kerberos.
|
||||
Usually, applications may use principals that do not have administrative rights in Kafka and Zookeeper.
|
||||
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
|
||||
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.
|
||||
|
||||
====== Multi-binder configuration and JAAS
|
||||
|
||||
When connecting to multiple clusters in which each one requires separate JAAS configuration, then set the JAAS configuration using the property `sasl.jaas.config`.
|
||||
When this property is present in the applicaiton, it takes precedence over the other strategies mentioned above.
|
||||
See this https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients[KIP-85] for more details.
|
||||
|
||||
For example, if you have two clusters in your application with separate JAAS configuration, then the following is a template that you can use:
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
binders:
|
||||
kafka1:
|
||||
type: kafka
|
||||
environment:
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
kafka:
|
||||
binder:
|
||||
brokers: localhost:9092
|
||||
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
|
||||
kafka2:
|
||||
type: kafka
|
||||
environment:
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
kafka:
|
||||
binder:
|
||||
brokers: localhost:9093
|
||||
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
|
||||
kafka.binder:
|
||||
configuration:
|
||||
security.protocol: SASL_PLAINTEXT
|
||||
sasl.mechanism: PLAIN
|
||||
```
|
||||
|
||||
Note that both the Kafka clusters, and the `sasl.jaas.config` values for each of them are different in the above configuration.
|
||||
|
||||
See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples/kafka-multi-binder-jaas[sample application] for more details on how to setup and run such an application.
|
||||
|
||||
[[pause-resume]]
|
||||
===== Example: Pausing and Resuming the Consumer
|
||||
|
||||
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
|
||||
This is facilitated by managing the binding lifecycle as shown in **Binding visualization and control** in the Spring Cloud Stream documentation, using `State.PAUSED` and `State.RESUMED`.
|
||||
|
||||
To resume, you can use an `ApplicationListener` (or `@EventListener` method) to receive `ListenerContainerIdleEvent` instances.
|
||||
The frequency at which events are published is controlled by the `idleEventInterval` property.
|
||||
|
||||
[[kafka-transactional-binder]]
|
||||
=== Transactional Binder
|
||||
|
||||
Enable transactions by setting `spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix` to a non-empty value, e.g. `tx-`.
|
||||
When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction.
|
||||
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
|
||||
A common producer factory is used for all producer bindings configured using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
|
||||
|
||||
IMPORTANT: Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
|
||||
When retries are enabled (the common property `maxAttempts` is greater than zero) the retry properties are used to configure a `DefaultAfterRollbackProcessor` to enable retries at the container level.
|
||||
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the `DefaultAfterRollbackProcessor` which runs after the main transaction has rolled back.
|
||||
|
||||
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders,
|
||||
@Value("${unique.tx.id.per.instance}") String txId) {
|
||||
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
|
||||
tm.setTransactionId(txId)
|
||||
return tm;
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
Notice that we get a reference to the binder using the `BinderFactory`; use `null` in the first argument when there is only one binder configured.
|
||||
If more than one binder is configured, use the binder name to get the reference.
|
||||
Once we have a reference to the binder, we can obtain a reference to the `ProducerFactory` and create a transaction manager.
|
||||
|
||||
Then you would use normal Spring transaction support, e.g. `TransactionTemplate` or `@Transactional`, for example:
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
public static class Sender {
|
||||
|
||||
@Transactional
|
||||
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
|
||||
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
|
||||
|
||||
IMPORTANT: If you deploy multiple instances of your application, each instance needs a unique `transactionIdPrefix`.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
=== Error Channels
|
||||
|
||||
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
|
||||
See https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling[this section on error handling] for more information.
|
||||
|
||||
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
|
||||
|
||||
* `failedMessage`: The Spring Messaging `Message<?>` that failed to be sent.
|
||||
* `record`: The raw `ProducerRecord` that was created from the `failedMessage`
|
||||
|
||||
There is no automatic handling of producer exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>).
|
||||
You can consume these exceptions with your own Spring Integration flow.
|
||||
|
||||
[[kafka-metrics]]
|
||||
=== Kafka Metrics
|
||||
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.offset`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
||||
The metrics provided are based on the Micrometer library.
|
||||
The binder creates the `KafkaBinderMetrics` bean if Micrometer is on the classpath and no other such beans provided by the application.
|
||||
The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
||||
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
|
||||
|
||||
You can exclude `KafkaBinderMetrics` from creating the necessary infrastructure like consumers and then reporting the metrics by providing the following component in the application.
|
||||
|
||||
```
|
||||
@Component
|
||||
class NoOpBindingMeters {
|
||||
NoOpBindingMeters(MeterRegistry registry) {
|
||||
registry.config().meterFilter(
|
||||
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
More details on how to suppress meters selectively can be found https://micrometer.io/docs/concepts#_meter_filters[here].
|
||||
|
||||
[[kafka-tombstones]]
|
||||
=== Tombstone Records (null record values)
|
||||
|
||||
When using compacted topics, a record with a `null` value (also called a tombstone record) represents the deletion of a key.
|
||||
To receive such messages in a `@StreamListener` method, the parameter must be marked as not required to receive a `null` value argument.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
|
||||
@Payload(required = false) Customer customer) {
|
||||
// customer is null if a tombstone record
|
||||
...
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
[[rebalance-listener]]
|
||||
=== Using a KafkaBindingRebalanceListener
|
||||
|
||||
Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
|
||||
Starting with version 2.1, if you provide a single `KafkaBindingRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
public interface KafkaBindingRebalanceListener {
|
||||
|
||||
/**
|
||||
* Invoked by the container before any pending offsets are committed.
|
||||
* @param bindingName the name of the binding.
|
||||
* @param consumer the consumer.
|
||||
* @param partitions the partitions.
|
||||
*/
|
||||
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked by the container after any pending offsets are committed.
|
||||
* @param bindingName the name of the binding.
|
||||
* @param consumer the consumer.
|
||||
* @param partitions the partitions.
|
||||
*/
|
||||
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked when partitions are initially assigned or after a rebalance.
|
||||
* Applications might only want to perform seek operations on an initial assignment.
|
||||
* @param bindingName the name of the binding.
|
||||
* @param consumer the consumer.
|
||||
* @param partitions the partitions.
|
||||
* @param initial true if this is the initial assignment.
|
||||
*/
|
||||
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
|
||||
boolean initial) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
You cannot set the `resetOffsets` consumer property to `true` when you provide a rebalance listener.
|
||||
|
||||
[[retry-and-dlq-processing]]
|
||||
=== Retry and Dead Letter Processing
|
||||
|
||||
By default, when you configure retry (e.g. `maxAttemts`) and `enableDlq` in a consumer binding, these functions are performed within the binder, with no participation by the listener container or Kafka consumer.
|
||||
|
||||
There are situations where it is preferable to move this functionality to the listener container, such as:
|
||||
|
||||
* The aggregate of retries and delays will exceed the consumer's `max.poll.interval.ms` property, potentially causing a partition rebalance.
|
||||
* You wish to publish the dead letter to a different Kafka cluster.
|
||||
* You wish to add retry listeners to the error handler.
|
||||
* ...
|
||||
|
||||
To configure moving this functionality from the binder to the container, define a `@Bean` of type `ListenerContainerWithDlqAndRetryCustomizer`.
|
||||
This interface has the following methods:
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
/**
|
||||
* Configure the container.
|
||||
* @param container the container.
|
||||
* @param destinationName the destination name.
|
||||
* @param group the group.
|
||||
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
|
||||
* enableDlq).
|
||||
* @param backOff the backOff using retry properties (if configured).
|
||||
* @see #retryAndDlqInBinding(String, String)
|
||||
*/
|
||||
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
|
||||
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
|
||||
@Nullable BackOff backOff);
|
||||
|
||||
/**
|
||||
* Return false to move retries and DLQ from the binding to a customized error handler
|
||||
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
|
||||
* configured via
|
||||
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
|
||||
* @param destinationName the destination name.
|
||||
* @param group the group.
|
||||
* @return true to disable retrie in the binding
|
||||
*/
|
||||
default boolean retryAndDlqInBinding(String destinationName, String group) {
|
||||
return true;
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
The destination resolver and `BackOff` are created from the binding properties (if configured).
|
||||
You can then use these to create a custom error handler and dead letter publisher; for example:
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
|
||||
return new ListenerContainerWithDlqAndRetryCustomizer() {
|
||||
|
||||
@Override
|
||||
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
|
||||
String group,
|
||||
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
|
||||
@Nullable BackOff backOff) {
|
||||
|
||||
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
|
||||
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template),
|
||||
dlqDestinationResolver);
|
||||
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retryAndDlqInBinding(String destinationName, String group) {
|
||||
return !destinationName.contains("topicWithLongTotalRetryConfig");
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
Now, only a single retry delay needs to be greater than the consumer's `max.poll.interval.ms` property.
|
||||
|
||||
[[consumer-producer-config-customizer]]
|
||||
=== Customizing Consumer and Producer configuration
|
||||
|
||||
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
|
||||
you can implement the following customizers.
|
||||
|
||||
* ConsumerConfigCustomizer
|
||||
* ProducerConfigCustomizer
|
||||
|
||||
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
|
||||
For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the `configure` method.
|
||||
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.
|
||||
|
||||
Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.
|
||||
|
||||
[[admin-client-config-customization]]
|
||||
=== Customizing AdminClient Configuration
|
||||
|
||||
As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an `AdminClientConfigCustomizer`.
|
||||
AdminClientConfigCustomizer's configure method provides access to the admin client properties, using which you can define further customization.
|
||||
Binder's Kafka topic provisioner gives the highest precedence for the properties given through this customizer.
|
||||
Here is an example of providing this customizer bean.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
|
||||
return props -> {
|
||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
[[custom-kafka-binder-health-indicator]]
|
||||
=== Custom Kafka Binder Health Indicator
|
||||
|
||||
Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath.
|
||||
This health indicator checks the health of the binder and any communication issues with the Kafka broker.
|
||||
If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for `KafkaBinderHealth` interface.
|
||||
`KafkaBinderHealth` is a marker interface that extends from `HealthIndicator`.
|
||||
In the custom implementation, it must provide an implementation for the `health()` method.
|
||||
The custom implementation must be present in the application configuration as a bean.
|
||||
When the binder discovers the custom implementation, it will use that instead of the default implementation.
|
||||
Here is an example of such a custom implementation bean in the application.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaBinderHealth kafkaBinderHealthIndicator() {
|
||||
return new KafkaBinderHealth() {
|
||||
@Override
|
||||
public Health health() {
|
||||
// custom implementation details.
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
103
docs/src/main/asciidoc/kafka/kafka_partitions.adoc
Normal file
@@ -0,0 +1,103 @@
|
||||
=== Partitioning with the Kafka Binder
|
||||
|
||||
Apache Kafka supports topic partitioning natively.
|
||||
|
||||
Sometimes it is advantageous to send data to specific partitions -- for example, when you want to strictly order message processing (all messages for a particular customer should go to the same partition).
|
||||
|
||||
The following example shows how to configure the producer and consumer side:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Source.class)
|
||||
public class KafkaPartitionProducerApplication {
|
||||
|
||||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||
|
||||
private static final String[] data = new String[] {
|
||||
"foo1", "bar1", "qux1",
|
||||
"foo2", "bar2", "qux2",
|
||||
"foo3", "bar3", "qux3",
|
||||
"foo4", "bar4", "qux4",
|
||||
};
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
|
||||
.web(false)
|
||||
.run(args);
|
||||
}
|
||||
|
||||
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
|
||||
public Message<?> generate() {
|
||||
String value = data[RANDOM.nextInt(data.length)];
|
||||
System.out.println("Sending: " + value);
|
||||
return MessageBuilder.withPayload(value)
|
||||
.setHeader("partitionKey", value)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
.application.yml
|
||||
[source, yaml]
|
||||
----
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
output:
|
||||
destination: partitioned.topic
|
||||
producer:
|
||||
partition-key-expression: headers['partitionKey']
|
||||
partition-count: 12
|
||||
----
|
||||
|
||||
IMPORTANT: The topic must be provisioned to have enough partitions to achieve the desired concurrency for all consumer groups.
|
||||
The above configuration supports up to 12 consumer instances (6 if their `concurrency` is 2, 4 if their concurrency is 3, and so on).
|
||||
It is generally best to "`over-provision`" the partitions to allow for future increases in consumers or concurrency.
|
||||
|
||||
NOTE: The preceding configuration uses the default partitioning (`key.hashCode() % partitionCount`).
|
||||
This may or may not provide a suitably balanced algorithm, depending on the key values.
|
||||
You can override this default by using the `partitionSelectorExpression` or `partitionSelectorClass` properties.
|
||||
|
||||
Since partitions are natively handled by Kafka, no special configuration is needed on the consumer side.
|
||||
Kafka allocates partitions across the instances.
|
||||
|
||||
The following Spring Boot application listens to a Kafka stream and prints (to the console) the partition ID to which each message goes:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
public class KafkaPartitionConsumerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
|
||||
.web(false)
|
||||
.run(args);
|
||||
}
|
||||
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
||||
System.out.println(in + " received from partition " + partition);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
.application.yml
|
||||
[source, yaml]
|
||||
----
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
input:
|
||||
destination: partitioned.topic
|
||||
group: myGroup
|
||||
----
|
||||
|
||||
You can add instances as needed.
|
||||
Kafka rebalances the partition allocations.
|
||||
If the instance count (or `instance count * concurrency`) exceeds the number of partitions, some consumers are idle.
|
||||
865
docs/src/main/asciidoc/kafka/kafka_tips.adoc
Normal file
@@ -0,0 +1,865 @@
|
||||
== Tips, Tricks and Recipes
|
||||
|
||||
=== Simple DLQ with Kafka
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
As a developer, I want to write a consumer application that processes records from a Kafka topic.
|
||||
However, if some error occurs in processing, I don't want the application to stop completely.
|
||||
Instead, I want to send the record in error to a DLT (Dead-Letter-Topic) and then continue processing new records.
|
||||
|
||||
==== Solution
|
||||
|
||||
The solution for this problem is to use the DLQ feature in Spring Cloud Stream.
|
||||
For the purposes of this discussion, let us assume that the following is our processor function.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<byte[]> processData() {
|
||||
return s -> {
|
||||
throw new RuntimeException();
|
||||
};
|
||||
```
|
||||
|
||||
This is a very trivial function that throws an exception for all the records that it processes, but you can take this function and extend it to any other similar situations.
|
||||
|
||||
In order to send the records in error to a DLT, we need to provide the following configuration.
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
bindings:
|
||||
processData-in-0:
|
||||
group: my-group
|
||||
destination: input-topic
|
||||
kafka:
|
||||
bindings:
|
||||
processData-in-0:
|
||||
consumer:
|
||||
enableDlq: true
|
||||
dlqName: input-topic-dlq
|
||||
```
|
||||
|
||||
In order to activate DLQ, the application must provide a group name.
|
||||
Anonymous consumers cannot use the DLQ facilities.
|
||||
We also need to enable DLQ by setting the `enableDLQ` property on the Kafka consumer binding to `true`.
|
||||
Finally, we can optionally provide the DLT name by providing the `dlqName` on Kafka consumer binding, which otherwise default to `input-topic-dlq.my-group.error` in this case.
|
||||
|
||||
Note that in the example consumer provided above, the type of the payload is `byte[]`.
|
||||
By default, the DLQ producer in Kafka binder expects the payload of type `byte[]`.
|
||||
If that is not the case, then we need to provide the configuration for proper serializer.
|
||||
For example, let us re-write the consumer function as below:
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<String> processData() {
|
||||
return s -> {
|
||||
throw new RuntimeException();
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
Now, we need to tell Spring Cloud Stream, how we want to serialize the data when writing to the DLT.
|
||||
Here is the modified configuration for this scenario:
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
bindings:
|
||||
processData-in-0:
|
||||
group: my-group
|
||||
destination: input-topic
|
||||
kafka:
|
||||
bindings:
|
||||
processData-in-0:
|
||||
consumer:
|
||||
enableDlq: true
|
||||
dlqName: input-topic-dlq
|
||||
dlqProducerProperties:
|
||||
configuration:
|
||||
value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
|
||||
```
|
||||
|
||||
=== DLQ with Advanced Retry Options
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
This is similar to the recipe above, but as a developer I would like to configure the way retries are handled.
|
||||
|
||||
==== Solution
|
||||
|
||||
If you followed the above recipe, then you get the default retry options built into the Kafka binder when the processing encounters an error.
|
||||
|
||||
By default, the binder retires for a maximum of 3 attempts with a one second initial delay, 2.0 multiplier with each back off with a max delay of 10 seconds.
|
||||
You can change all these configurations as below:
|
||||
|
||||
```
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
|
||||
```
|
||||
|
||||
If you want, you can also provide a list of retryable exceptions by providing a map of boolean values.
|
||||
For example,
|
||||
|
||||
```
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
|
||||
```
|
||||
|
||||
By default, any exceptions not listed in the map above will be retried.
|
||||
If that is not desired, then you can disable that by providing,
|
||||
|
||||
```
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
|
||||
```
|
||||
|
||||
You can also provide your own `RetryTemplate` and mark it as `@StreamRetryTemplate` which will be scanned and used by the binder.
|
||||
This is useful when you want more sophisticated retry strategies and policies.
|
||||
|
||||
If you have multiple `@StreamRetryTemplate` beans, then you can specify which one your binding wants by using the property,
|
||||
|
||||
```
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
|
||||
```
|
||||
|
||||
=== Handling Deserialization errors with DLQ
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I have a processor that encounters a deserilzartion exception in Kafka consumer.
|
||||
I would expect that the Spring Cloud Stream DLQ mechanism will catch that scenario, but it does not.
|
||||
How can I handle this?
|
||||
|
||||
==== Solution
|
||||
|
||||
The normal DLQ mechanism offered by Spring Cloud Stream will not help when Kafka consumer throws an irrecoverable deserialization excepion.
|
||||
This is because, this exception happens even before the consumer's `poll()` method returns.
|
||||
Spring for Apache Kafka project offers some great ways to help the binder with this situation.
|
||||
Let us explore those.
|
||||
|
||||
Assuming this is our function:
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<String> functionName() {
|
||||
return s -> {
|
||||
System.out.println(s);
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
It is a trivial function that takes a `String` parameter.
|
||||
|
||||
We want to bypass the message converters provided by Spring Cloud Stream and want to use native deserializers instead.
|
||||
In the case of `String` types, it does not make much sense, but for more complex types like AVRO etc. you have to rely on external deserializers and therefore want to delegate the conversion to Kafka.
|
||||
|
||||
Now when the consumer receives the data, let us assume that there is a bad record that causes a deserilziation errror, maybe someone passed an `Integer` instead of a `String` for example.
|
||||
In that case, if you don't do something in the application, the excption will be propagated through the chain and your application will exit eventually.
|
||||
|
||||
In order to handle this, you can add a `ListenerContainerCustomizer` `@Bean` that configures a `SeekToCurrentErrorHandler`.
|
||||
This `SeekToCurrentErrorHandler` is configured with a `DeadLetterPublishingRecoverer`.
|
||||
We also need to configure an `ErrorHandlingDeserializer` for the consumer.
|
||||
That sounds like a lot of complex things, but in reality, it boils down to these 3 beans in this case.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
|
||||
return (container, dest, group) -> {
|
||||
container.setErrorHandler(errorHandler);
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
```
|
||||
@Bean
|
||||
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
|
||||
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
|
||||
}
|
||||
```
|
||||
|
||||
```
|
||||
@Bean
|
||||
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
|
||||
return new DeadLetterPublishingRecoverer(bytesTemplate);
|
||||
}
|
||||
```
|
||||
|
||||
Let us analyze each of them.
|
||||
The first one is the `ListenerContainerCustomizer` bean that takes a `SeekToCurrentErrorHandler`.
|
||||
The container is now customized with that particular error handler.
|
||||
You can learn more about container customization https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration[here].
|
||||
|
||||
The second bean is the `SeekToCurrentErrorHandler` that is configured with a publishing to a `DLT`.
|
||||
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current[here] for more details on `SeekToCurrentErrorHandler`.
|
||||
|
||||
The third bean is the `DeadLetterPublishingRecoverer` that is ultimately responsible for sending to the `DLT`.
|
||||
By default, the `DLT` topic is named as the ORIGINAL_TOPIC_NAME.DLT.
|
||||
You can change that though.
|
||||
See the https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters[docs] for more details.
|
||||
|
||||
|
||||
We also need to configure an https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer[ErrorHandlingDeserializer] through application config.
|
||||
|
||||
The `ErrorHandlingDeserializer` delegates to the actual deserializer.
|
||||
In case of errors, it sets key/value of the record to be null and includes the raw bytes of the message.
|
||||
It then sets the exception in a header and passes this record to the listener, which then calls the registered error handler.
|
||||
|
||||
Following is the configuration required:
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
function:
|
||||
definition: functionName
|
||||
bindings:
|
||||
functionName-in-0:
|
||||
group: group-name
|
||||
destination: input-topic
|
||||
consumer:
|
||||
use-native-decoding: true
|
||||
kafka:
|
||||
bindings:
|
||||
functionName-in-0:
|
||||
consumer:
|
||||
enableDlq: true
|
||||
dlqName: dlq-topic
|
||||
dlqProducerProperties:
|
||||
configuration:
|
||||
value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
configuration:
|
||||
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
|
||||
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
|
||||
```
|
||||
|
||||
We are providing the `ErrorHandlingDeserializer` through the `configuration` property on the binding.
|
||||
We are also indicating that the actual deserializer to delegate is the `StringDeserializer`.
|
||||
|
||||
Keep in mind that none of the dlq properties above are relevant for the discussions in this recipe.
|
||||
They are purely meant for addressing any application level errors only.
|
||||
|
||||
=== Basic offset management in Kafka binder
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I want to write a Spring Cloud Stream Kafka consumer applicaiton and not sure about how it manages Kafka consumer offsets.
|
||||
Can you exaplain?
|
||||
|
||||
==== Solution
|
||||
|
||||
We encourage you read the https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#reset-offsets[docs] section on this to get a thorough understanding on it.
|
||||
|
||||
Here is it in a gist:
|
||||
|
||||
Kafka supports two types of offsets to start with by default - `earliest` and `latest`.
|
||||
Their semantics are self-explanatory from their names.
|
||||
|
||||
Assuming you are running the consumer for the first time.
|
||||
If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer.
|
||||
Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the `latest` available offset in the topic partition.
|
||||
On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the `earliest` available offset in the topic partiton.
|
||||
|
||||
In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset` and setting it to either `earliest` or `latest`.
|
||||
|
||||
Now, assume that you already ran the consumer before and now starting it again.
|
||||
In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you).
|
||||
It simply picks up from the last committed offset onward.
|
||||
This is true, even when you have a `startOffset` value provided.
|
||||
|
||||
However, you can override the default behavior where the consumer starts from the last committed offset by using the `resetOffsets` property.
|
||||
In order to do that, set the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets` to `true` (which is `false` by default).
|
||||
Then make sure you provide the `startOffset` value (either `earliest` or `latest`).
|
||||
When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.
|
||||
|
||||
=== Seeking to arbitrary offsets in Kafka
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
Using Kafka binder, I know that it can set the offset to either `earliest` or `latest`, but I have a requirement to seek the offset to something in the middle, an arbitrary offset.
|
||||
Is there a way to achieve this using Spring Cloud Stream Kafka biner?
|
||||
|
||||
==== Solution
|
||||
|
||||
Previously we saw how Kafka binder allows you to tackle basic offset management.
|
||||
By default, the binder does not allow you to rewind to an arbitrary offset, at least through the mechanism we saw in that reipce.
|
||||
However, there are some low-level strategies that the binder provides to achieve this use case.
|
||||
Let's explore them.
|
||||
|
||||
First of all, when you want to reset to an arbitrary offset other than `earliest` or `latest`, make sure to leave the `resetOffsets` configuration to its defaults, which is `false`.
|
||||
Then you have to provide a custom bean of type `KafkaBindingRebalanceListener`, which will be injected into all consumer bindings.
|
||||
It is an interface that comes with a few default methods, but here is the method that we are interested in:
|
||||
|
||||
```
|
||||
/**
|
||||
* Invoked when partitions are initially assigned or after a rebalance. Applications
|
||||
* might only want to perform seek operations on an initial assignment. While the
|
||||
* 'initial' argument is true for each thread (when concurrency is greater than 1),
|
||||
* implementations should keep track of exactly which partitions have been sought.
|
||||
* There is a race in that a rebalance could occur during startup and so a topic/
|
||||
* partition that has been sought on one thread may be re-assigned to another
|
||||
* thread and you may not wish to re-seek it at that time.
|
||||
* @param bindingName the name of the binding.
|
||||
* @param consumer the consumer.
|
||||
* @param partitions the partitions.
|
||||
* @param initial true if this is the initial assignment on the current thread.
|
||||
*/
|
||||
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions, boolean initial) {
|
||||
// do nothing
|
||||
}
|
||||
```
|
||||
|
||||
Let us look at the details.
|
||||
|
||||
In essence, this method will be invoked each time during the initial assignment for a topic partition or after a rebalance.
|
||||
For better illustration, let us assume that our topic is `foo` and it has 4 partitions.
|
||||
Initially, we are only starting a single consumer in the group and this consumer will consume from all partitions.
|
||||
When the consumer starts for the first time, all 4 partitions are getting initially assigned.
|
||||
However, we do not want to start the partitions to consume at the defaults (`earliest` since we define a group), rather for each partition, we want them to consume after seeking to arbitrary offsets.
|
||||
Imagine that you have a business case to consume from certain offsets as below.
|
||||
|
||||
```
|
||||
Partition start offset
|
||||
|
||||
0 1000
|
||||
1 2000
|
||||
2 2000
|
||||
3 1000
|
||||
```
|
||||
|
||||
This could be achieved by implementing the above method as below.
|
||||
|
||||
```
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
|
||||
|
||||
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
|
||||
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
|
||||
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
|
||||
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
|
||||
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
|
||||
|
||||
if (initial) {
|
||||
partitions.forEach(tp -> {
|
||||
if (topicPartitionOffset.containsKey(tp)) {
|
||||
final Long offset = topicPartitionOffset.get(tp);
|
||||
try {
|
||||
consumer.seek(tp, offset);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Handle excpetions carefully.
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This is just a rudimentary implementation.
|
||||
Real world use cases are much more complex than this and you need to adjust accordingly, but this certainly gives you a basic sketch.
|
||||
When consumer `seek` fails, it may throw some runtime exceptions and you need to decide what to do in those cases.
|
||||
|
||||
==== What if we start a second consumer with the same group id?
|
||||
|
||||
When we add a second consumer, a rebalance will occur and some partitions will be moved around.
|
||||
Let's say that the new consumer gets partitions `2` and `3`.
|
||||
When this new Spring Cloud Stream consumer calls this `onPartitionsAssigned` method, it will see that this is the initial assignment for partititon `2` and `3` on this consumer.
|
||||
Therefore, it will do the seek operation becuase of the conditional check on the `initial` argument.
|
||||
In the case of the first consumer, it now only has partitons `0` and `1`
|
||||
However, for this consumer it was simply a rebalance event and not considered as an intial assignment.
|
||||
Thus, it will not re-seek to the given offsets because of the conditional check on the `initial` argument.
|
||||
|
||||
=== How do I manually acknowledge using Kafka binder?
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
Using Kafka binder, I want to manually acknowledge messages in my consumer.
|
||||
How do I do that?
|
||||
|
||||
==== Solution
|
||||
|
||||
By default, Kafka binder delegates to the default commit settings in Spring for Apache Kafka project.
|
||||
The default `ackMode` in Spring Kafka is `batch`.
|
||||
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets[here] for more details on that.
|
||||
|
||||
There are situations in which you want to disable this default commit behavior and rely on manual commits.
|
||||
Following steps allow you to do that.
|
||||
|
||||
Set the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode` to either `MANUAL` or `MANUAL_IMMEDIATE`.
|
||||
When it is set like that, then there will be a header called `kafka_acknowledgment` (from `KafkaHeaders.ACKNOWLEDGMENT`) present in the message received by the consumer method.
|
||||
|
||||
For example, imagine this as your consumer method.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<Message<String>> myConsumer() {
|
||||
return msg -> {
|
||||
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
|
||||
if (acknowledgment != null) {
|
||||
System.out.println("Acknowledgment provided");
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
Then you set the property `spring.cloud.stream.bindings.myConsumer-in-0.consumer.ackMode` to `MANUAL` or `MANUAL_IMMEDIATE`.
|
||||
|
||||
=== How do I override the default binding names in Spring Cloud Stream?
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
Spring Cloud Stream creates default bindings based on the function definition and signature, but how do I override these to more domain friendly names?
|
||||
|
||||
==== Solution
|
||||
|
||||
Assume that following is your function signature.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Function<String, String> uppercase(){
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
By default, Spring Cloud Stream will create the bindings as below.
|
||||
|
||||
1. uppercase-in-0
|
||||
2. uppercase-out-0
|
||||
|
||||
You can override these bindings to something by using the following properties.
|
||||
|
||||
```
|
||||
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
|
||||
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
|
||||
```
|
||||
|
||||
After this, all binding properties must be made on the new names, `my-transformer-in` and `my-transformer-out`.
|
||||
|
||||
Here is another example with Kafka Streams and multiple inputs.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
By default, Spring Cloud Stream will create three different binding names for this function.
|
||||
|
||||
1. processOrder-in-0
|
||||
2. processOrder-in-1
|
||||
3. processOrder-out-0
|
||||
|
||||
You have to use these binding names each time you want to set some configuration on these bindings.
|
||||
You don't like that, and you want to use more domain-friendly and readable binding names, for example, something like.
|
||||
|
||||
1. orders
|
||||
2. accounts
|
||||
3. enrichedOrders
|
||||
|
||||
You can easily do that by simply setting these three properties
|
||||
|
||||
1. spring.cloud.stream.function.bindings.processOrder-in-0=orders
|
||||
2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts
|
||||
3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
|
||||
|
||||
Once you do that, it overrides the default binding names and any properties that you want to set on them must be on these new binding names.
|
||||
|
||||
=== How do I send a message key as part of my record?
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I need to send a key along with the payload of the record, is there a way to do that in Spring Cloud Stream?
|
||||
|
||||
==== Solution
|
||||
|
||||
It is often necessary that you want to send associative data structure like a map as the record with a key and value.
|
||||
Spring Cloud Stream allows you to do that in a straightforward manner.
|
||||
Following is a basic blueprint for doing this, but you may want to adapt it to your paricular use case.
|
||||
|
||||
Here is sample producer method (aka `Supplier`).
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supplier() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
|
||||
}
|
||||
```
|
||||
|
||||
This is a trivial function that sends a message with a `String` payload, but also with a key.
|
||||
Note that we set the key as a message header using `KafkaHeaders.MESSAGE_KEY`.
|
||||
|
||||
If you want to change the key from the default `kafka_messageKey`, then in the configuration, we need to specify this property:
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
|
||||
```
|
||||
|
||||
Please note that we use the binding name `supplier-out-0` since that is our function name, please update accordingly.
|
||||
|
||||
Then, we use this new key when we produce the message.
|
||||
|
||||
=== How do I use native serializer and deserializer instead of message conversion done by Spring Cloud Stream?
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
Instead of using the message converters in Spring Cloud Stream, I want to use native Serializer and Deserializer in Kafka.
|
||||
By default, Spring Cloud Stream takes care of this conversion using its internal built-in message converters.
|
||||
How can I bypass this and delegate the responsibility to Kafka?
|
||||
|
||||
==== Solution
|
||||
|
||||
This is really easy to do.
|
||||
|
||||
All you have to do is to provide the following property to enable native serialization.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
|
||||
```
|
||||
|
||||
Then, you need to also set the serailzers.
|
||||
There are a couple of ways to do this.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
```
|
||||
|
||||
or using the binder configuration.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.binder.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
spring.cloud.stream.kafka.binder.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
```
|
||||
|
||||
When using the binder way, it is applied against all the bindings whereas setting them at the bindings are per binding.
|
||||
|
||||
On the deserializing side, you just need to provide the deserializers as configuration.
|
||||
|
||||
For example,
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configurarion.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
```
|
||||
|
||||
You can also set them at the binder level.
|
||||
|
||||
There is an optional property that you can set to force native decoding.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
|
||||
```
|
||||
|
||||
However, in the case of Kafka binder, this is unncessary, as by the time it reaches the binder, Kafka already deserializes them using the configured deserializers.
|
||||
|
||||
=== Explain how offset resetting work in Kafka Streams binder
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
By default, Kafka Streams binder always starts from the earliest offset for a new consumer.
|
||||
Sometimes, it is beneficial or required by the application to start from the latest offset.
|
||||
Kafka Streams binder allows you to do that.
|
||||
|
||||
==== Solution
|
||||
|
||||
Before we look at the solution, let us look at the following scenario.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
|
||||
(s, t) -> s.join(t, ...)
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
We have a `BiConsumer` bean that requires two input bindings.
|
||||
In this case, the first binding is for a `KStream` and the second one is for a `KTable`.
|
||||
When running this application for the first time, by default, both bindings start from the `earliest` offset.
|
||||
What about I want to start from the `latest` offset due to some requirements?
|
||||
You can do this by enabling the following properties.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
|
||||
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
|
||||
```
|
||||
|
||||
If you want only one binding to start from the `latest` offset and the other to consumer from the default `earliest`, then leave the latter binding out from the configuration.
|
||||
|
||||
Keep in mind that, once there are committed offsets, these setting are *not* honored and the committed offsets take precedence.
|
||||
|
||||
=== Keeping track of successful sending of records (producing) in Kafka
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I have a Kafka producer application and I want to keep track of all my successful sedings.
|
||||
|
||||
==== Solution
|
||||
|
||||
Let us assume that we have this following supplier in the application.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supplier() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
|
||||
}
|
||||
```
|
||||
|
||||
Then, we need to define a new `MessageChannel` bean to capture all the successful send information.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public MessageChannel fooRecordChannel() {
|
||||
return new DirectChannel();
|
||||
}
|
||||
```
|
||||
|
||||
Next, define this property in the application configuration to provide the bean name for the `recordMetadataChannel`.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
|
||||
```
|
||||
|
||||
At this point, successful sent information will be sent to the `fooRecordChannel`.
|
||||
|
||||
You can write an `IntegrationFlow` as below to see the information.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public IntegrationFlow integrationFlow() {
|
||||
return f -> f.channel("fooRecordChannel")
|
||||
.handle((payload, messageHeaders) -> payload);
|
||||
}
|
||||
```
|
||||
|
||||
In the `handle` method, the payload is what got sent to Kafka and the message headers contain a special key called `kafka_recordMetadata`.
|
||||
Its value is a `RecordMetadata` that contains information about topic partition, current offset etc.
|
||||
|
||||
=== Adding custom header mapper in Kafka
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?
|
||||
|
||||
==== Solution
|
||||
|
||||
Under normal circumstances, this should be fine.
|
||||
|
||||
Imagine, you have the following producer.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supply() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
|
||||
}
|
||||
```
|
||||
|
||||
On the consumer side, you should still see the header "foo", and the following should not give you any issues.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<Message<String>> consume() {
|
||||
return s -> {
|
||||
final String foo = (String)s.getHeaders().get("foo");
|
||||
System.out.println(foo);
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
If you provide a https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties[custom header mapper] in the application, then this won't work.
|
||||
Let's say you have an empty `KafkaHeaderMapper` in the application.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
If that is your implementation, then you will miss the `foo` header on the consumer.
|
||||
Chances are that, you may have some logic inside those `KafkaHeaderMapper` methods.
|
||||
You need the following to populate the `foo` header.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
final String foo = (String) headers.get("foo");
|
||||
target.add("foo", foo.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
final Header foo = source.lastHeader("foo");
|
||||
target.put("foo", new String(foo.value()));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
That will properly populate the `foo` header from the producer to consumer.
|
||||
|
||||
==== Special note on the id header
|
||||
|
||||
In Spring Cloud Stream, the `id` header is a special header, but some applications may want to have special custom id headers - something like `custom-id` or `ID` or `Id`.
|
||||
The first one (`custom-id`) will propagate without any custom header mapper from producer to consumer.
|
||||
However, if you produce with a variant of the framework reserved `id` header - such as `ID`, `Id`, `iD` etc. then you will run into issues with the internals of the framework.
|
||||
See this https://stackoverflow.com/questions/68412600/change-the-behaviour-in-spring-cloud-stream-make-header-matcher-case-sensitive[StackOverflow thread] fore more context on this use case.
|
||||
In that case, you must use a custom `KafkaHeaderMapper` to map the case-sensitive id header.
|
||||
For example, let's say you have the following producer.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supply() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
|
||||
}
|
||||
```
|
||||
|
||||
The header `Id` above will be gone from the consuming side as it clashes with the framework `id` header.
|
||||
You can provide a custom `KafkaHeaderMapper` to solve this issue.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
final String myId = (String) headers.get("Id");
|
||||
target.add("Id", myId.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
final Header Id = source.lastHeader("Id");
|
||||
target.put("Id", new String(Id.value()));
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
By doing this, both `id` and `Id` headers will be available from the producer to the consumer side.
|
||||
|
||||
=== Producing to multiple topics in transaction
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
How do I produce transactional messages to multiple Kafka topics?
|
||||
|
||||
For more context, see this https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud[StackOverflow question].
|
||||
|
||||
==== Solution
|
||||
|
||||
Use transactional support in Kafka binder for transactions and then provide an `AfterRollbackProcessor`.
|
||||
In order to produce to multiple topics, use `StreamBridge` API.
|
||||
|
||||
Below are the code snippets for this:
|
||||
|
||||
```
|
||||
@Autowired
|
||||
StreamBridge bridge;
|
||||
|
||||
@Bean
|
||||
Consumer<String> input() {
|
||||
return str -> {
|
||||
System.out.println(str);
|
||||
this.bridge.send("left", str.toUpperCase());
|
||||
this.bridge.send("right", str.toLowerCase());
|
||||
if (str.equals("Fail")) {
|
||||
throw new RuntimeException("test");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
|
||||
return (container, dest, group) -> {
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
|
||||
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
|
||||
container.setAfterRollbackProcessor(rollbackProcessor);
|
||||
};
|
||||
}
|
||||
|
||||
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
|
||||
return new DefaultAfterRollbackProcessor<>(
|
||||
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
==== Required Configuration
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
|
||||
spring.cloud.stream.kafka.binder.required-acks=all
|
||||
spring.cloud.stream.bindings.input-in-0.group=foo
|
||||
spring.cloud.stream.bindings.input-in-0.destination=input
|
||||
spring.cloud.stream.bindings.left.destination=left
|
||||
spring.cloud.stream.bindings.right.destination=right
|
||||
|
||||
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
|
||||
```
|
||||
|
||||
in order to test, you can use the following:
|
||||
|
||||
```
|
||||
@Bean
|
||||
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
|
||||
return args -> {
|
||||
System.in.read();
|
||||
template.send("input", "Fail".getBytes());
|
||||
template.send("input", "Good".getBytes());
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
Some important notes:
|
||||
|
||||
Please ensure that you don't have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named `input.DLT` based on the initial consumer function).
|
||||
Also, reset the `maxAttempts` on consumer binding to `1` in order to avoid retries by the binder.
|
||||
It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the `FixedBackoff`).
|
||||
|
||||
See the https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud[StackOverflow thread] for more details on how to test this code.
|
||||
If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the `isolation-level` on the consumer binding to `read-committed`.
|
||||
|
||||
This https://stackoverflow.com/questions/68941306/spring-cloud-stream-database-transaction-does-not-roll-back[StackOverflow thread] is also related to this discussion.
|
||||
|
||||
=== Pitfalls to avoid when running multiple pollable consumers
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
How can I run multiple instances of the pollable consumers and generate unique `client.id` for each instance?
|
||||
|
||||
==== Solution
|
||||
|
||||
Assuming that I have the following definition:
|
||||
|
||||
```
|
||||
spring.cloud.stream.pollable-source: foo
|
||||
spring.cloud.stream.bindings.foo-in-0.group: my-group
|
||||
```
|
||||
|
||||
When running the application, the Kafka consumer generates a client.id (something like `consumer-my-group-1`).
|
||||
For each instance of the application that is running, this `client.id` will be the same, causing unexpected issues.
|
||||
|
||||
In order to fix this, you can add the following property on each instance of the application:
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
|
||||
```
|
||||
|
||||
See this https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1139[GitHub issue] for more details.
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
:github-tag: master
|
||||
:github-repo: spring-cloud/spring-cloud-stream-binder-kafka
|
||||
:github-raw: https://raw.githubusercontent.com/{github-repo}/{github-tag}
|
||||
:github-code: https://github.com/{github-repo}/tree/{github-tag}
|
||||
:toc: left
|
||||
:toclevels: 8
|
||||
:nofooter:
|
||||
:sectlinks: true
|
||||
|
||||
|
||||
[[spring-cloud-stream-binder-kafka-reference]]
|
||||
= Spring Cloud Stream Kafka Binder Reference Guide
|
||||
Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Thomas Risberg, Dave Syer, David Turanski, Janne Valkealahti, Benjamin Klein, Henryk Konsek, Gary Russell, Arnaud Jardiné, Soby Chacko
|
||||
:doctype: book
|
||||
:toc:
|
||||
:toclevels: 4
|
||||
:source-highlighter: prettify
|
||||
:numbered:
|
||||
:icons: font
|
||||
:hide-uri-scheme:
|
||||
:spring-cloud-stream-binder-kafka-repo: snapshot
|
||||
:github-tag: master
|
||||
:spring-cloud-stream-binder-kafka-docs-version: current
|
||||
:spring-cloud-stream-binder-kafka-docs: https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/{spring-cloud-stream-binder-kafka-docs-version}/reference
|
||||
:spring-cloud-stream-binder-kafka-docs-current: https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/
|
||||
:github-repo: spring-cloud/spring-cloud-stream-binder-kafka
|
||||
:github-raw: https://raw.github.com/{github-repo}/{github-tag}
|
||||
:github-code: https://github.com/{github-repo}/tree/{github-tag}
|
||||
:github-wiki: https://github.com/{github-repo}/wiki
|
||||
:github-master-code: https://github.com/{github-repo}/tree/master
|
||||
:sc-ext: java
|
||||
// ======================================================================================
|
||||
|
||||
|
||||
*{project-version}*
|
||||
|
||||
|
||||
= Reference Guide
|
||||
include::kafka/kafka_overview.adoc[]
|
||||
|
||||
include::kafka/kafka_dlq.adoc[]
|
||||
|
||||
include::kafka/kafka_partitions.adoc[]
|
||||
|
||||
include::kafka/kafka-streams.adoc[]
|
||||
|
||||
include::kafka/kafka_tips.adoc[]
|
||||
|
||||
// ======================================================================================
|
||||
@@ -52,7 +52,7 @@ If you chose RabbitMQ for the middleware, your Spring Initializr should now be a
|
||||
[%hardbreaks]
|
||||
[%hardbreaks]
|
||||
[%hardbreaks]
|
||||
image::/docs/src/main/asciidoc/images/spring-initializr.png[align="center"]
|
||||
image::spring-initializr.png[align="center"]
|
||||
|
||||
[%hardbreaks]
|
||||
[%hardbreaks]
|
||||
|
||||
BIN
docs/src/main/asciidoc/rabbit/images/part-bindings.png
Normal file
|
After Width: | Height: | Size: 60 KiB |
BIN
docs/src/main/asciidoc/rabbit/images/part-exchange.png
Normal file
|
After Width: | Height: | Size: 24 KiB |
BIN
docs/src/main/asciidoc/rabbit/images/part-queues.png
Normal file
|
After Width: | Height: | Size: 46 KiB |
BIN
docs/src/main/asciidoc/rabbit/images/rabbit-binder.png
Normal file
|
After Width: | Height: | Size: 12 KiB |
243
docs/src/main/asciidoc/rabbit/rabbit_dlq.adoc
Normal file
@@ -0,0 +1,243 @@
|
||||
[[rabbit-dlq-processing]]
|
||||
== Dead-Letter Queue Processing
|
||||
|
||||
Because you cannot anticipate how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
|
||||
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue.
|
||||
However, if the problem is a permanent issue, that could cause an infinite loop.
|
||||
The following Spring Boot application shows an example of how to route those messages back to the original queue but moves them to a third "`parking lot`" queue after three attempts.
|
||||
The second example uses the https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/[RabbitMQ Delayed Message Exchange] to introduce a delay to the re-queued message.
|
||||
In this example, the delay increases for each attempt.
|
||||
These examples use a `@RabbitListener` to receive messages from the DLQ.
|
||||
You could also use `RabbitTemplate.receive()` in a batch process.
|
||||
|
||||
The examples assume the original destination is `so8400in` and the consumer group is `so8400`.
|
||||
|
||||
=== Non-Partitioned Destinations
|
||||
|
||||
The first two examples are for when the destination is *not* partitioned:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
public class ReRouteDlqApplication {
|
||||
|
||||
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
|
||||
|
||||
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
|
||||
|
||||
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
|
||||
|
||||
private static final String X_RETRIES_HEADER = "x-retries";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
|
||||
System.out.println("Press enter to exit");
|
||||
System.in.read();
|
||||
context.close();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@RabbitListener(queues = DLQ)
|
||||
public void rePublish(Message failedMessage) {
|
||||
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
|
||||
if (retriesHeader == null) {
|
||||
retriesHeader = Integer.valueOf(0);
|
||||
}
|
||||
if (retriesHeader < 3) {
|
||||
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
|
||||
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
|
||||
}
|
||||
else {
|
||||
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue parkingLot() {
|
||||
return new Queue(PARKING_LOT);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
public class ReRouteDlqApplication {
|
||||
|
||||
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
|
||||
|
||||
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
|
||||
|
||||
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
|
||||
|
||||
private static final String X_RETRIES_HEADER = "x-retries";
|
||||
|
||||
private static final String DELAY_EXCHANGE = "dlqReRouter";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
|
||||
System.out.println("Press enter to exit");
|
||||
System.in.read();
|
||||
context.close();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@RabbitListener(queues = DLQ)
|
||||
public void rePublish(Message failedMessage) {
|
||||
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
|
||||
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
|
||||
if (retriesHeader == null) {
|
||||
retriesHeader = Integer.valueOf(0);
|
||||
}
|
||||
if (retriesHeader < 3) {
|
||||
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
|
||||
headers.put("x-delay", 5000 * retriesHeader);
|
||||
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
|
||||
}
|
||||
else {
|
||||
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DirectExchange delayExchange() {
|
||||
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
|
||||
exchange.setDelayed(true);
|
||||
return exchange;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindOriginalToDelay() {
|
||||
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue parkingLot() {
|
||||
return new Queue(PARKING_LOT);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
=== Partitioned Destinations
|
||||
|
||||
With partitioned destinations, there is one DLQ for all partitions. We determine the original queue from the headers.
|
||||
|
||||
==== `republishToDlq=false`
|
||||
|
||||
When `republishToDlq` is `false`, RabbitMQ publishes the message to the DLX/DLQ with an `x-death` header containing information about the original destination, as shown in the following example:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
public class ReRouteDlqApplication {
|
||||
|
||||
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
|
||||
|
||||
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
|
||||
|
||||
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
|
||||
|
||||
private static final String X_DEATH_HEADER = "x-death";
|
||||
|
||||
private static final String X_RETRIES_HEADER = "x-retries";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
|
||||
System.out.println("Press enter to exit");
|
||||
System.in.read();
|
||||
context.close();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@RabbitListener(queues = DLQ)
|
||||
public void rePublish(Message failedMessage) {
|
||||
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
|
||||
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
|
||||
if (retriesHeader == null) {
|
||||
retriesHeader = Integer.valueOf(0);
|
||||
}
|
||||
if (retriesHeader < 3) {
|
||||
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
|
||||
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
|
||||
String exchange = (String) xDeath.get(0).get("exchange");
|
||||
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
|
||||
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
|
||||
}
|
||||
else {
|
||||
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue parkingLot() {
|
||||
return new Queue(PARKING_LOT);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
==== `republishToDlq=true`
|
||||
|
||||
When `republishToDlq` is `true`, the republishing recoverer adds the original exchange and routing key to headers, as shown in the following example:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
public class ReRouteDlqApplication {
|
||||
|
||||
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
|
||||
|
||||
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
|
||||
|
||||
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
|
||||
|
||||
private static final String X_RETRIES_HEADER = "x-retries";
|
||||
|
||||
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
|
||||
|
||||
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
|
||||
System.out.println("Press enter to exit");
|
||||
System.in.read();
|
||||
context.close();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@RabbitListener(queues = DLQ)
|
||||
public void rePublish(Message failedMessage) {
|
||||
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
|
||||
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
|
||||
if (retriesHeader == null) {
|
||||
retriesHeader = Integer.valueOf(0);
|
||||
}
|
||||
if (retriesHeader < 3) {
|
||||
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
|
||||
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
|
||||
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
|
||||
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
|
||||
}
|
||||
else {
|
||||
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue parkingLot() {
|
||||
return new Queue(PARKING_LOT);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
1266
docs/src/main/asciidoc/rabbit/rabbit_overview.adoc
Normal file
125
docs/src/main/asciidoc/rabbit/rabbit_partitions.adoc
Normal file
@@ -0,0 +1,125 @@
|
||||
== Partitioning with the RabbitMQ Binder
|
||||
|
||||
RabbitMQ does not support partitioning natively.
|
||||
|
||||
Sometimes, it is advantageous to send data to specific partitions -- for example, when you want to strictly order message processing, all messages for a particular customer should go to the same partition.
|
||||
|
||||
The `RabbitMessageChannelBinder` provides partitioning by binding a queue for each partition to the destination exchange.
|
||||
|
||||
The following Java and YAML examples show how to configure the producer:
|
||||
|
||||
.Producer
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Source.class)
|
||||
public class RabbitPartitionProducerApplication {
|
||||
|
||||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||
|
||||
private static final String[] data = new String[] {
|
||||
"abc1", "def1", "qux1",
|
||||
"abc2", "def2", "qux2",
|
||||
"abc3", "def3", "qux3",
|
||||
"abc4", "def4", "qux4",
|
||||
};
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
|
||||
.web(false)
|
||||
.run(args);
|
||||
}
|
||||
|
||||
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
|
||||
public Message<?> generate() {
|
||||
String value = data[RANDOM.nextInt(data.length)];
|
||||
System.out.println("Sending: " + value);
|
||||
return MessageBuilder.withPayload(value)
|
||||
.setHeader("partitionKey", value)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
.application.yml
|
||||
[source, yaml]
|
||||
----
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
output:
|
||||
destination: partitioned.destination
|
||||
producer:
|
||||
partitioned: true
|
||||
partition-key-expression: headers['partitionKey']
|
||||
partition-count: 2
|
||||
required-groups:
|
||||
- myGroup
|
||||
----
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
The configuration in the prececing example uses the default partitioning (`key.hashCode() % partitionCount`).
|
||||
This may or may not provide a suitably balanced algorithm, depending on the key values.
|
||||
You can override this default by using the `partitionSelectorExpression` or `partitionSelectorClass` properties.
|
||||
|
||||
The `required-groups` property is required only if you need the consumer queues to be provisioned when the producer is deployed.
|
||||
Otherwise, any messages sent to a partition are lost until the corresponding consumer is deployed.
|
||||
====
|
||||
|
||||
The following configuration provisions a topic exchange:
|
||||
|
||||
image::part-exchange.png[scaledwidth="50%"]
|
||||
|
||||
The following queues are bound to that exchange:
|
||||
|
||||
image::part-queues.png[scaledwidth="50%"]
|
||||
|
||||
The following bindings associate the queues to the exchange:
|
||||
|
||||
image::part-bindings.png[scaledwidth="50%"]
|
||||
|
||||
The following Java and YAML examples continue the previous examples and show how to configure the consumer:
|
||||
|
||||
.Consumer
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
public class RabbitPartitionConsumerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
|
||||
.web(false)
|
||||
.run(args);
|
||||
}
|
||||
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
|
||||
System.out.println(in + " received from queue " + queue);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
.application.yml
|
||||
[source, yaml]
|
||||
----
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
input:
|
||||
destination: partitioned.destination
|
||||
group: myGroup
|
||||
consumer:
|
||||
partitioned: true
|
||||
instance-index: 0
|
||||
----
|
||||
|
||||
IMPORTANT: The `RabbitMessageChannelBinder` does not support dynamic scaling.
|
||||
There must be at least one consumer per partition.
|
||||
The consumer's `instanceIndex` is used to indicate which partition is consumed.
|
||||
Platforms such as Cloud Foundry can have only one instance with an `instanceIndex`.
|
||||
@@ -0,0 +1,46 @@
|
||||
:github-tag: master
|
||||
:github-repo: spring-cloud/spring-cloud-stream-binder-rabbit
|
||||
:github-raw: https://raw.githubusercontent.com/{github-repo}/{github-tag}
|
||||
:github-code: https://github.com/{github-repo}/tree/{github-tag}
|
||||
:toc: left
|
||||
:toclevels: 8
|
||||
:nofooter:
|
||||
:sectlinks: true
|
||||
|
||||
[[spring-cloud-stream-binder-rabbit-reference]]
|
||||
= Spring Cloud Stream RabbitMQ Binder Reference Guide
|
||||
Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Thomas Risberg, Dave Syer, David Turanski, Janne Valkealahti, Benjamin Klein, Gary Russell, Jay Bryant
|
||||
:doctype: book
|
||||
:toc:
|
||||
:toclevels: 4
|
||||
:source-highlighter: prettify
|
||||
:numbered:
|
||||
:icons: font
|
||||
:hide-uri-scheme:
|
||||
:spring-cloud-stream-binder-rabbit-repo: snapshot
|
||||
:github-tag: master
|
||||
:spring-cloud-stream-binder-rabbit-docs-version: current
|
||||
:spring-cloud-stream-binder-rabbit-docs: https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/{spring-cloud-stream-binder-rabbit-docs-version}/reference
|
||||
:spring-cloud-stream-binder-rabbit-docs-current: https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current-SNAPSHOT/reference/html/
|
||||
:github-repo: spring-cloud/spring-cloud-stream-binder-rabbit
|
||||
:github-raw: https://raw.github.com/{github-repo}/{github-tag}
|
||||
:github-code: https://github.com/{github-repo}/tree/{github-tag}
|
||||
:github-wiki: https://github.com/{github-repo}/wiki
|
||||
:github-master-code: https://github.com/{github-repo}/tree/master
|
||||
:sc-ext: java
|
||||
|
||||
// ======================================================================================
|
||||
|
||||
*{project-version}*
|
||||
|
||||
|
||||
= Reference Guide
|
||||
|
||||
|
||||
include::rabbit/rabbit_overview.adoc[]
|
||||
|
||||
include::rabbit/rabbit_dlq.adoc[]
|
||||
|
||||
include::rabbit/rabbit_partitions.adoc[]
|
||||
|
||||
// ======================================================================================
|
||||
@@ -93,7 +93,7 @@ exposed by the external brokers and input/output arguments in your code. Broker
|
||||
necessary to establish bindings are handled by middleware-specific _Binder_ implementations.
|
||||
|
||||
.Spring Cloud Stream Application
|
||||
image::{github-raw}/docs/src/main/asciidoc/images/SCSt-with-binder.png[width=800,scaledwidth="75%",align="center"]
|
||||
image::SCSt-with-binder.png[width=800,scaledwidth="75%",align="center"]
|
||||
|
||||
==== Fat JAR
|
||||
|
||||
@@ -130,7 +130,7 @@ Communication between applications follows a publish-subscribe model, where data
|
||||
This can be seen in the following figure, which shows a typical deployment for a set of interacting Spring Cloud Stream applications.
|
||||
|
||||
.Spring Cloud Stream Publish-Subscribe
|
||||
image::{github-raw}/docs/src/main/asciidoc/images/SCSt-sensors.png[width=800,scaledwidth="75%",align="center"]
|
||||
image::SCSt-sensors.png[width=800,scaledwidth="75%",align="center"]
|
||||
|
||||
Data reported by sensors to an HTTP endpoint is sent to a common destination named `raw-sensor-data`.
|
||||
From the destination, it is independently processed by a microservice application that computes time-windowed averages and by another microservice application that ingests the raw data into HDFS (Hadoop Distributed File System).
|
||||
@@ -155,7 +155,7 @@ Each consumer binding can use the `spring.cloud.stream.bindings.<bindingName>.gr
|
||||
For the consumers shown in the following figure, this property would be set as `spring.cloud.stream.bindings.<bindingName>.group=hdfsWrite` or `spring.cloud.stream.bindings.<bindingName>.group=average`.
|
||||
|
||||
.Spring Cloud Stream Consumer Groups
|
||||
image::{github-raw}/docs/src/main/asciidoc/images/SCSt-groups.png[width=800,scaledwidth="75%",align="center"]
|
||||
image::SCSt-groups.png[width=800,scaledwidth="75%",align="center"]
|
||||
|
||||
All groups that subscribe to a given destination receive a copy of published data, but only one member of each group receives a given message from that destination.
|
||||
By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups.
|
||||
@@ -200,7 +200,7 @@ Spring Cloud Stream provides a common abstraction for implementing partitioned p
|
||||
Partitioning can thus be used whether the broker itself is naturally partitioned (for example, Kafka) or not (for example, RabbitMQ).
|
||||
|
||||
.Spring Cloud Stream Partitioning
|
||||
image::{github-raw}/docs/src/main/asciidoc/images/SCSt-partitioning.png[width=800,scaledwidth="75%",align="center"]
|
||||
image::SCSt-partitioning.png[width=800,scaledwidth="75%",align="center"]
|
||||
|
||||
Partitioning is a critical concept in stateful processing, where it is critical (for either performance or consistency reasons) to ensure that all related data is processed together.
|
||||
For example, in the time-windowed average calculation example, it is important that all measurements from any given sensor are processed by the same application instance.
|
||||
@@ -215,7 +215,7 @@ To understand the programming model, you should be familiar with the following c
|
||||
* *Bindings:* Bridge between the external messaging systems and application provided _Producers_ and _Consumers_ of messages (created by the Destination Binders).
|
||||
* *Message:* The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).
|
||||
|
||||
image::{github-raw}/docs/src/main/asciidoc/images/SCSt-overview.png[width=800,scaledwidth="75%",align="center"]
|
||||
image::SCSt-overview.png[width=800,scaledwidth="75%",align="center"]
|
||||
|
||||
=== Destination Binders
|
||||
|
||||
@@ -1693,7 +1693,7 @@ This section provides information about the main concepts behind the Binder SPI,
|
||||
The following image shows the general relationship of producers and consumers:
|
||||
|
||||
.Producers and Consumers
|
||||
image::{github-raw}/docs/src/main/asciidoc/images/producers-consumers.png[width=800,scaledwidth="75%",align="center"]
|
||||
image::producers-consumers.png[width=800,scaledwidth="75%",align="center"]
|
||||
|
||||
A producer is any component that sends messages to a binding destination.
|
||||
The binding destination can be bound to an external message broker with a `Binder` implementation for that broker.
|
||||
|
||||