GH-141: Add MANUAL acks only in acknowledge()

Fixes GH-141 (https://github.com/spring-projects/spring-kafka/issues/141)

The `MANUAL` acks are intended to be acknowledged only by the end-listener initiative.
An unconditional `this.acks.add(record)` is in the `invokeListener()` function by mistake.

* Wrap `this.acks.add(record)` in the `invokeListener()` to `if (!this.isAnyManualAck)`
* Fix `NPE` in te main poll loop as `if (records != null && this.logger.isDebugEnabled())`
This commit is contained in:
Artem Bilan
2016-07-07 20:32:39 -04:00
parent aa4db96289
commit 44cc15f952

View File

@@ -71,6 +71,7 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
* @author Murali Reddy
* @author Marius Bogoevici
* @author Martin Dam
* @author Artem Bilan
*/
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
@@ -406,7 +407,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
this.logger.trace("Polling (paused=" + this.paused + ")...");
}
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
if (this.logger.isDebugEnabled()) {
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
}
if (records != null && records.count() > 0) {
@@ -599,7 +600,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
else {
this.listener.onMessage(record);
}
this.acks.add(record);
if (!this.isAnyManualAck) {
this.acks.add(record);
}
if (this.isRecordAck) {
this.consumer.wakeup();
}