From 819a0ce8441972219d5f7a225b51c4294bc00a74 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 7 Jul 2016 20:32:39 -0400 Subject: [PATCH] GH-141: Add `MANUAL` acks only in `acknowledge()` Fixes GH-141 (https://github.com/spring-projects/spring-kafka/issues/141) The `MANUAL` acks are intended to be acknowledged only by the end-listener initiative. An unconditional `this.acks.add(record)` is in the `invokeListener()` function by mistake. * Wrap `this.acks.add(record)` in the `invokeListener()` to `if (!this.isAnyManualAck)` * Fix `NPE` in te main poll loop as `if (records != null && this.logger.isDebugEnabled())` (cherry picked from commit 44cc15f) --- .../kafka/listener/KafkaMessageListenerContainer.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8faa00b3..c54af872 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -71,6 +71,7 @@ import org.springframework.util.concurrent.ListenableFutureCallback; * @author Murali Reddy * @author Marius Bogoevici * @author Martin Dam + * @author Artem Bilan */ public class KafkaMessageListenerContainer extends AbstractMessageListenerContainer { @@ -406,7 +407,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener this.logger.trace("Polling (paused=" + this.paused + ")..."); } ConsumerRecords records = this.consumer.poll(this.containerProperties.getPollTimeout()); - if (this.logger.isDebugEnabled()) { + if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); } if (records != null && records.count() > 0) { @@ -599,7 +600,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener else { this.listener.onMessage(record); } - this.acks.add(record); + if (!this.isAnyManualAck) { + this.acks.add(record); + } if (this.isRecordAck) { this.consumer.wakeup(); }