From 632559a341e02ab3aaa0bb2339ac980560470a17 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 12 Aug 2016 12:17:29 -0400 Subject: [PATCH] GH-164: Document Log Compaction Support Resolves #164 (https://github.com/spring-projects/spring-kafka/issues/164) Also fix message conversion for `send(Message m)`. --- .../converter/MessagingMessageConverter.java | 8 ++- .../EnableKafkaIntegrationTests.java | 11 +++- src/reference/asciidoc/kafka.adoc | 51 +++++++++++++++++++ 3 files changed, 67 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index 1c06cf62..baa87e37 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -97,7 +97,13 @@ public class MessagingMessageConverter implements MessageConverter { * @return the payload. */ protected Object convertPayload(Message message) { - return message.getPayload(); + Object payload = message.getPayload(); + if (payload instanceof KafkaNull) { + return null; + } + else { + return payload; + } } /** diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 042adbdd..7c484de4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -58,6 +58,7 @@ import org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageLi import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.kafka.support.converter.StringJsonMessageConverter; import org.springframework.kafka.test.rule.KafkaEmbedded; @@ -199,7 +200,8 @@ public class EnableKafkaIntegrationTests { @Test public void testMulti() throws Exception { - template.send("annotated8", 0, "foo"); + template.send("annotated8", 0, 1, "foo"); + template.send("annotated8", 0, 1, null); template.flush(); assertThat(this.multiListener.latch1.await(60, TimeUnit.SECONDS)).isTrue(); } @@ -552,13 +554,18 @@ public class EnableKafkaIntegrationTests { @KafkaListener(id = "multi", topics = "annotated8") static class MultiListenerBean { - private final CountDownLatch latch1 = new CountDownLatch(1); + private final CountDownLatch latch1 = new CountDownLatch(2); @KafkaHandler public void bar(String bar) { latch1.countDown(); } + @KafkaHandler + public void bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { + latch1.countDown(); + } + public void foo(String bar) { } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 8255f2c8..09041977 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1,3 +1,4 @@ + [[kafka]] === Using Spring for Apache Kafka @@ -512,3 +513,53 @@ Hence we use `startsWith` in the condition. CAUTION: If you wish to use the idle event to stop the lister container, you should not call `container.stop()` on the thread that calls the listener - it will cause delays and unnecessary log messages. Instead, you should hand off the event to a different thread that can then stop the container. Also, you should not `stop()` the container instance in the event if it is a child container, you should stop the concurrent container instead. + +==== Log Compaction + +When using https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction[Log Compaction], it is possible to send and receive messages with `null` payloads which identifies the deletion of a key. + +Starting with _version 1.0.3_, this is now fully supported. + +To send a `null` payload using the `KafkaTemplate` simply pass null into the value argument of the `send()` methods. +One exception to this is the `send(Message message)` variant. +Since `spring-messaging` `Message` cannot have a `null` payload, a special payload type `KafkaNull` is used and the framework will send `null`. +For convenience, the static `KafkaNull.INSTANCE` is provided. + +When using a message listener container, the received `ConsumerRecord` will have a `null` `value()`. + +To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`; you will usually also need the key so your application knows which key was "deleted": + +[source, java] +---- +@KafkaListener(id = "deletableListener", topics = "myTopic") +public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { + // value == null represents key deletion +} +---- + +When using a class-level `@KafkaListener`, some additional configuration is needed - a `@KafkaHandler` method with a `KafkaNull` payload: + +[source, java] +---- +@KafkaListener(id = "multi", topics = "myTopic") +static class MultiListenerBean { + + private final CountDownLatch latch1 = new CountDownLatch(2); + + @KafkaHandler + public void listen(String foo) { + ... + } + + @KafkaHandler + public void listen(Integer bar) { + ... + } + + @KafkaHandler + public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { + ... + } + +} +----