GH-199: Do not wakeup consumer after queueing ack
Fixes GH-199 (https://github.com/spring-projects/spring-kafka/issues/199) Waking up the consumer too often will trigger a rebalance and messages are not actually committed
This commit is contained in:
@@ -772,9 +772,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
if (!this.isAnyManualAck && !this.autoCommit) {
|
||||
this.acks.add(record);
|
||||
}
|
||||
if (this.isRecordAck) {
|
||||
this.consumer.wakeup();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
|
||||
@@ -792,9 +789,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
}
|
||||
}
|
||||
}
|
||||
if (this.isManualAck || this.isBatchAck) {
|
||||
this.consumer.wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
private void processCommits() {
|
||||
@@ -995,9 +989,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
ListenerConsumer.this.logger.debug("Interrupt ignored");
|
||||
}
|
||||
}
|
||||
if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
|
||||
ListenerConsumer.this.consumer.wakeup();
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
|
||||
@@ -571,7 +571,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
});
|
||||
containerProps.setSyncCommits(true);
|
||||
containerProps.setAckMode(AckMode.BATCH);
|
||||
containerProps.setPollTimeout(10000);
|
||||
containerProps.setPollTimeout(100);
|
||||
containerProps.setAckOnError(false);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
|
||||
@@ -640,7 +640,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
});
|
||||
containerProps.setSyncCommits(true);
|
||||
containerProps.setAckMode(AckMode.BATCH);
|
||||
containerProps.setPollTimeout(10000);
|
||||
containerProps.setPollTimeout(100);
|
||||
containerProps.setAckOnError(false);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
|
||||
@@ -714,7 +714,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
});
|
||||
containerProps.setSyncCommits(true);
|
||||
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
|
||||
containerProps.setPollTimeout(10000);
|
||||
containerProps.setPollTimeout(100);
|
||||
containerProps.setAckOnError(false);
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
|
||||
@@ -777,7 +777,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
});
|
||||
containerProps.setSyncCommits(true);
|
||||
containerProps.setAckMode(AckMode.BATCH);
|
||||
containerProps.setPollTimeout(10000);
|
||||
containerProps.setPollTimeout(100);
|
||||
containerProps.setAckOnError(true);
|
||||
final CountDownLatch latch = new CountDownLatch(4);
|
||||
containerProps.setGenericErrorHandler((BatchErrorHandler) (t, messages) -> {
|
||||
|
||||
Reference in New Issue
Block a user