GH-161; Memory Leak with autoCommit
Fixes #161
We should not add to the `acks` collection when autoCommit is true.
Polishing - PR Comments
(cherry picked from commit 9e0e248)
This commit is contained in:
committed by
Artem Bilan
parent
989d6b5ce6
commit
ed6523be06
@@ -600,7 +600,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
else {
|
||||
this.listener.onMessage(record);
|
||||
}
|
||||
if (!this.isAnyManualAck) {
|
||||
if (!this.isAnyManualAck && !this.autoCommit) {
|
||||
this.acks.add(record);
|
||||
}
|
||||
if (this.isRecordAck) {
|
||||
@@ -608,7 +608,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (this.containerProperties.isAckOnError()) {
|
||||
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
|
||||
this.acks.add(record);
|
||||
}
|
||||
if (this.containerProperties.getErrorHandler() != null) {
|
||||
@@ -848,6 +848,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
@Override
|
||||
public void acknowledge() {
|
||||
try {
|
||||
if (ListenerConsumer.this.autoCommit) {
|
||||
throw new IllegalStateException("Manual acks are not allowed when auto commit is used");
|
||||
}
|
||||
ListenerConsumer.this.acks.put(this.record);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
|
||||
@@ -130,6 +130,14 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
template.flush();
|
||||
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(listenerThreadNames).allMatch(threadName -> threadName.contains("-consumer-"));
|
||||
@SuppressWarnings("unchecked")
|
||||
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
|
||||
"containers", List.class);
|
||||
assertThat(containers.size()).isEqualTo(2);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
assertThat(KafkaTestUtils.getPropertyValue(containers.get(i), "listenerConsumer.acks", Collection.class)
|
||||
.size()).isEqualTo(0);
|
||||
}
|
||||
container.stop();
|
||||
this.logger.info("Stop auto");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user