GH-161; Memory Leak with autoCommit
Fixes #161 We should not add to the `acks` collection when autoCommit is true. Polishing - PR Comments
This commit is contained in:
committed by
Artem Bilan
parent
ad62b09732
commit
9e0e248ba5
@@ -599,7 +599,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
else {
|
||||
this.listener.onMessage(record);
|
||||
}
|
||||
if (!this.isAnyManualAck) {
|
||||
if (!this.isAnyManualAck && !this.autoCommit) {
|
||||
this.acks.add(record);
|
||||
}
|
||||
if (this.isRecordAck) {
|
||||
@@ -607,7 +607,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (this.containerProperties.isAckOnError()) {
|
||||
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
|
||||
this.acks.add(record);
|
||||
}
|
||||
if (this.containerProperties.getErrorHandler() != null) {
|
||||
@@ -853,6 +853,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
@Override
|
||||
public void acknowledge() {
|
||||
try {
|
||||
if (ListenerConsumer.this.autoCommit) {
|
||||
throw new IllegalStateException("Manual acks are not allowed when auto commit is used");
|
||||
}
|
||||
ListenerConsumer.this.acks.put(this.record);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
|
||||
@@ -130,6 +130,14 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
template.flush();
|
||||
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(listenerThreadNames).allMatch(threadName -> threadName.contains("-consumer-"));
|
||||
@SuppressWarnings("unchecked")
|
||||
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
|
||||
"containers", List.class);
|
||||
assertThat(containers.size()).isEqualTo(2);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
assertThat(KafkaTestUtils.getPropertyValue(containers.get(i), "listenerConsumer.acks", Collection.class)
|
||||
.size()).isEqualTo(0);
|
||||
}
|
||||
container.stop();
|
||||
this.logger.info("Stop auto");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user