diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8faa00b3..c54af872 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -71,6 +71,7 @@ import org.springframework.util.concurrent.ListenableFutureCallback; * @author Murali Reddy * @author Marius Bogoevici * @author Martin Dam + * @author Artem Bilan */ public class KafkaMessageListenerContainer extends AbstractMessageListenerContainer { @@ -406,7 +407,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener this.logger.trace("Polling (paused=" + this.paused + ")..."); } ConsumerRecords records = this.consumer.poll(this.containerProperties.getPollTimeout()); - if (this.logger.isDebugEnabled()) { + if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); } if (records != null && records.count() > 0) { @@ -599,7 +600,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener else { this.listener.onMessage(record); } - this.acks.add(record); + if (!this.isAnyManualAck) { + this.acks.add(record); + } if (this.isRecordAck) { this.consumer.wakeup(); }