diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.java index 59374cdd5..0fcc077c0 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.java @@ -24,6 +24,9 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -117,7 +120,9 @@ public class DltAwareProcessor implements Processor { StreamBridge streamBridge = this.dltPublishingContext.getStreamBridge(); if (streamBridge != null) { - streamBridge.send(this.dltDestination, r.value()); + Message message = MessageBuilder.withPayload(r.value()) + .setHeader(KafkaHeaders.KEY, r.key()).build(); + streamBridge.send(this.dltDestination, message); } }; } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DltAwareProcessorTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DltAwareProcessorTests.java index d88944c58..5ce3e28e6 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DltAwareProcessorTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DltAwareProcessorTests.java @@ -21,6 +21,9 @@ import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.KStream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -57,6 +60,7 @@ public class DltAwareProcessorTests { public static void setUp() { Map consumerProps = KafkaTestUtils.consumerProps("group", "false", embeddedKafka); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>( consumerProps); @@ -77,6 +81,7 @@ public class DltAwareProcessorTests { try (ConfigurableApplicationContext context = app.run("--server.port=0", "--spring.cloud.stream.bindings.errorStream-in-0.destination=error-stream-in", + "--spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer", "--spring.cloud.stream.kafka.streams.bindings.errorStream-in-0.consumer.application-id=test-error-stream-app-id", "--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", "--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" @@ -91,15 +96,17 @@ public class DltAwareProcessorTests { private void receiveAndValidate(String in, String out) { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>( + senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>( senderProps); try { - KafkaTemplate template = new KafkaTemplate<>(pf, true); + KafkaTemplate template = new KafkaTemplate<>(pf, true); template.setDefaultTopic(in); - template.sendDefault("foobar"); + template.send(in, "my-key", "foobar"); ConsumerRecord cr = KafkaTestUtils.getSingleRecord(consumer, out); assertThat(cr.value().contains("foobar")).isTrue(); + assertThat(cr.key().contains("my-key")).isTrue(); } finally { pf.destroy(); diff --git a/docs/modules/ROOT/pages/kafka/kafka-streams-binder/error-handling.adoc b/docs/modules/ROOT/pages/kafka/kafka-streams-binder/error-handling.adoc index a983b25f1..d425e6f2f 100644 --- a/docs/modules/ROOT/pages/kafka/kafka-streams-binder/error-handling.adoc +++ b/docs/modules/ROOT/pages/kafka/kafka-streams-binder/error-handling.adoc @@ -189,3 +189,17 @@ public java.util.function.Function, KStream