GH-199: Do not wakeup consumer after queueing ack
Fixes GH-199 (https://github.com/spring-projects/spring-kafka/issues/199)
Waking up the consumer too often will trigger a rebalance and messages are not actually committed
(cherry picked from commit c245cbb)
This commit is contained in:
@@ -603,9 +603,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
if (!this.isAnyManualAck && !this.autoCommit) {
|
||||
this.acks.add(record);
|
||||
}
|
||||
if (this.isRecordAck) {
|
||||
this.consumer.wakeup();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
|
||||
@@ -619,9 +616,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
}
|
||||
}
|
||||
}
|
||||
if (this.isManualAck || this.isBatchAck) {
|
||||
this.consumer.wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
private void processCommits() {
|
||||
@@ -792,9 +786,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
ListenerConsumer.this.logger.debug("Interrupt ignored");
|
||||
}
|
||||
}
|
||||
if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
|
||||
ListenerConsumer.this.consumer.wakeup();
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
|
||||
@@ -525,7 +525,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
});
|
||||
containerProps.setSyncCommits(true);
|
||||
containerProps.setAckMode(AckMode.BATCH);
|
||||
containerProps.setPollTimeout(10000);
|
||||
containerProps.setPollTimeout(100);
|
||||
containerProps.setAckOnError(false);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
|
||||
|
||||
Reference in New Issue
Block a user