Sending record keys as part of DltAwareProcessor

This commit is contained in:
Soby Chacko
2023-10-12 20:26:29 -04:00
parent f811ed0b7c
commit 4b53c839ac
3 changed files with 30 additions and 4 deletions

View File

@@ -24,6 +24,9 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@@ -117,7 +120,9 @@ public class DltAwareProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, V
return (r, e) -> {
StreamBridge streamBridge = this.dltPublishingContext.getStreamBridge();
if (streamBridge != null) {
streamBridge.send(this.dltDestination, r.value());
Message<VIn> message = MessageBuilder.withPayload(r.value())
.setHeader(KafkaHeaders.KEY, r.key()).build();
streamBridge.send(this.dltDestination, message);
}
};
}

View File

@@ -21,6 +21,9 @@ import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -57,6 +60,7 @@ public class DltAwareProcessorTests {
public static void setUp() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
@@ -77,6 +81,7 @@ public class DltAwareProcessorTests {
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.cloud.stream.bindings.errorStream-in-0.destination=error-stream-in",
"--spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer",
"--spring.cloud.stream.kafka.streams.bindings.errorStream-in-0.consumer.application-id=test-error-stream-app-id",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
@@ -91,15 +96,17 @@ public class DltAwareProcessorTests {
private void receiveAndValidate(String in, String out) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(in);
template.sendDefault("foobar");
template.send(in, "my-key", "foobar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer,
out);
assertThat(cr.value().contains("foobar")).isTrue();
assertThat(cr.key().contains("my-key")).isTrue();
}
finally {
pf.destroy();

View File

@@ -189,3 +189,17 @@ public java.util.function.Function<KStream<String, String>, KStream<String, Stri
```
In this case, when the record fails, the `DltAwareProcessor`, instead of using its built-in recoverer which publishes to a DLT, uses the user provided recoverer which is a `BiConsumer` that takes the failed record and the exception thrown as arguments.
=== Handling Record Keys in DltAwareProcessor
When sending failed records to a DLT using `DltAwareProcessor`, if you want to send the record keys to the DLT topic, then you need to set the proper serializer on the DLT binding.
This is because, `DltAwareProcessor` uses `StreamBridge` which uses the regular Kafka binder (message-channel based) which by default uses a `ByteArraySerializer` for keys.
In the case of record values, Spring Cloud Stream converts the payload to proper `byte[]`; however, that is not the case with keys, as it simply pass along what it received in the header as a key.
If you are providing a non-byte array key, then that might cause class cast exceptions and to avoid that you need to set a serializer on the DLT binding as below.
Assuming that the DLT destination is `hello-dlt-1` and the record key is of String datatype.
```
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
```