GH-164: Document Log Compaction Support
Resolves #164 (https://github.com/spring-projects/spring-kafka/issues/164) Also fix message conversion for `send(Message<?> m)`.
This commit is contained in:
committed by
Artem Bilan
parent
d295c9f71d
commit
632559a341
@@ -97,7 +97,13 @@ public class MessagingMessageConverter implements MessageConverter {
|
||||
* @return the payload.
|
||||
*/
|
||||
protected Object convertPayload(Message<?> message) {
|
||||
return message.getPayload();
|
||||
Object payload = message.getPayload();
|
||||
if (payload instanceof KafkaNull) {
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
return payload;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -58,6 +58,7 @@ import org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageLi
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.KafkaNull;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
@@ -199,7 +200,8 @@ public class EnableKafkaIntegrationTests {
|
||||
|
||||
@Test
|
||||
public void testMulti() throws Exception {
|
||||
template.send("annotated8", 0, "foo");
|
||||
template.send("annotated8", 0, 1, "foo");
|
||||
template.send("annotated8", 0, 1, null);
|
||||
template.flush();
|
||||
assertThat(this.multiListener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
}
|
||||
@@ -552,13 +554,18 @@ public class EnableKafkaIntegrationTests {
|
||||
@KafkaListener(id = "multi", topics = "annotated8")
|
||||
static class MultiListenerBean {
|
||||
|
||||
private final CountDownLatch latch1 = new CountDownLatch(1);
|
||||
private final CountDownLatch latch1 = new CountDownLatch(2);
|
||||
|
||||
@KafkaHandler
|
||||
public void bar(String bar) {
|
||||
latch1.countDown();
|
||||
}
|
||||
|
||||
@KafkaHandler
|
||||
public void bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
|
||||
latch1.countDown();
|
||||
}
|
||||
|
||||
public void foo(String bar) {
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
[[kafka]]
|
||||
=== Using Spring for Apache Kafka
|
||||
|
||||
@@ -512,3 +513,53 @@ Hence we use `startsWith` in the condition.
|
||||
CAUTION: If you wish to use the idle event to stop the lister container, you should not call `container.stop()` on the thread that calls the listener - it will cause delays and unnecessary log messages.
|
||||
Instead, you should hand off the event to a different thread that can then stop the container.
|
||||
Also, you should not `stop()` the container instance in the event if it is a child container, you should stop the concurrent container instead.
|
||||
|
||||
==== Log Compaction
|
||||
|
||||
When using https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction[Log Compaction], it is possible to send and receive messages with `null` payloads which identifies the deletion of a key.
|
||||
|
||||
Starting with _version 1.0.3_, this is now fully supported.
|
||||
|
||||
To send a `null` payload using the `KafkaTemplate` simply pass null into the value argument of the `send()` methods.
|
||||
One exception to this is the `send(Message<?> message)` variant.
|
||||
Since `spring-messaging` `Message<?>` cannot have a `null` payload, a special payload type `KafkaNull` is used and the framework will send `null`.
|
||||
For convenience, the static `KafkaNull.INSTANCE` is provided.
|
||||
|
||||
When using a message listener container, the received `ConsumerRecord` will have a `null` `value()`.
|
||||
|
||||
To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`; you will usually also need the key so your application knows which key was "deleted":
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@KafkaListener(id = "deletableListener", topics = "myTopic")
|
||||
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
|
||||
// value == null represents key deletion
|
||||
}
|
||||
----
|
||||
|
||||
When using a class-level `@KafkaListener`, some additional configuration is needed - a `@KafkaHandler` method with a `KafkaNull` payload:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@KafkaListener(id = "multi", topics = "myTopic")
|
||||
static class MultiListenerBean {
|
||||
|
||||
private final CountDownLatch latch1 = new CountDownLatch(2);
|
||||
|
||||
@KafkaHandler
|
||||
public void listen(String foo) {
|
||||
...
|
||||
}
|
||||
|
||||
@KafkaHandler
|
||||
public void listen(Integer bar) {
|
||||
...
|
||||
}
|
||||
|
||||
@KafkaHandler
|
||||
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
|
||||
...
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
Reference in New Issue
Block a user