GH-164: Document Log Compaction Support

Resolves #164 (https://github.com/spring-projects/spring-kafka/issues/164)

Also fix message conversion for `send(Message<?> m)`.
This commit is contained in:
Gary Russell
2016-08-12 12:17:29 -04:00
committed by Artem Bilan
parent d295c9f71d
commit 632559a341
3 changed files with 67 additions and 3 deletions

View File

@@ -97,7 +97,13 @@ public class MessagingMessageConverter implements MessageConverter {
* @return the payload.
*/
protected Object convertPayload(Message<?> message) {
return message.getPayload();
Object payload = message.getPayload();
if (payload instanceof KafkaNull) {
return null;
}
else {
return payload;
}
}
/**

View File

@@ -58,6 +58,7 @@ import org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageLi
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -199,7 +200,8 @@ public class EnableKafkaIntegrationTests {
@Test
public void testMulti() throws Exception {
template.send("annotated8", 0, "foo");
template.send("annotated8", 0, 1, "foo");
template.send("annotated8", 0, 1, null);
template.flush();
assertThat(this.multiListener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
}
@@ -552,13 +554,18 @@ public class EnableKafkaIntegrationTests {
@KafkaListener(id = "multi", topics = "annotated8")
static class MultiListenerBean {
private final CountDownLatch latch1 = new CountDownLatch(1);
private final CountDownLatch latch1 = new CountDownLatch(2);
@KafkaHandler
public void bar(String bar) {
latch1.countDown();
}
@KafkaHandler
public void bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
latch1.countDown();
}
public void foo(String bar) {
}

View File

@@ -1,3 +1,4 @@
[[kafka]]
=== Using Spring for Apache Kafka
@@ -512,3 +513,53 @@ Hence we use `startsWith` in the condition.
CAUTION: If you wish to use the idle event to stop the lister container, you should not call `container.stop()` on the thread that calls the listener - it will cause delays and unnecessary log messages.
Instead, you should hand off the event to a different thread that can then stop the container.
Also, you should not `stop()` the container instance in the event if it is a child container, you should stop the concurrent container instead.
==== Log Compaction
When using https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction[Log Compaction], it is possible to send and receive messages with `null` payloads which identifies the deletion of a key.
Starting with _version 1.0.3_, this is now fully supported.
To send a `null` payload using the `KafkaTemplate` simply pass null into the value argument of the `send()` methods.
One exception to this is the `send(Message<?> message)` variant.
Since `spring-messaging` `Message<?>` cannot have a `null` payload, a special payload type `KafkaNull` is used and the framework will send `null`.
For convenience, the static `KafkaNull.INSTANCE` is provided.
When using a message listener container, the received `ConsumerRecord` will have a `null` `value()`.
To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`; you will usually also need the key so your application knows which key was "deleted":
[source, java]
----
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// value == null represents key deletion
}
----
When using a class-level `@KafkaListener`, some additional configuration is needed - a `@KafkaHandler` method with a `KafkaNull` payload:
[source, java]
----
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
private final CountDownLatch latch1 = new CountDownLatch(2);
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
----