GH-141: Add MANUAL acks only in acknowledge()
Fixes GH-141 (https://github.com/spring-projects/spring-kafka/issues/141)
The `MANUAL` acks are intended to be acknowledged only by the end-listener initiative.
An unconditional `this.acks.add(record)` is in the `invokeListener()` function by mistake.
* Wrap `this.acks.add(record)` in the `invokeListener()` to `if (!this.isAnyManualAck)`
* Fix `NPE` in te main poll loop as `if (records != null && this.logger.isDebugEnabled())`
(cherry picked from commit 44cc15f)
This commit is contained in:
@@ -71,6 +71,7 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
* @author Murali Reddy
|
||||
* @author Marius Bogoevici
|
||||
* @author Martin Dam
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
|
||||
|
||||
@@ -406,7 +407,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
this.logger.trace("Polling (paused=" + this.paused + ")...");
|
||||
}
|
||||
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
if (records != null && this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Received: " + records.count() + " records");
|
||||
}
|
||||
if (records != null && records.count() > 0) {
|
||||
@@ -599,7 +600,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
else {
|
||||
this.listener.onMessage(record);
|
||||
}
|
||||
this.acks.add(record);
|
||||
if (!this.isAnyManualAck) {
|
||||
this.acks.add(record);
|
||||
}
|
||||
if (this.isRecordAck) {
|
||||
this.consumer.wakeup();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user