GH-161; Memory Leak with autoCommit

Fixes #161

We should not add to the `acks` collection when autoCommit is true.

Polishing - PR Comments
This commit is contained in:
Gary Russell
2016-08-03 17:22:38 -07:00
committed by Artem Bilan
parent ad62b09732
commit 9e0e248ba5
2 changed files with 13 additions and 2 deletions

View File

@@ -599,7 +599,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
else {
this.listener.onMessage(record);
}
if (!this.isAnyManualAck) {
if (!this.isAnyManualAck && !this.autoCommit) {
this.acks.add(record);
}
if (this.isRecordAck) {
@@ -607,7 +607,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
}
catch (Exception e) {
if (this.containerProperties.isAckOnError()) {
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
this.acks.add(record);
}
if (this.containerProperties.getErrorHandler() != null) {
@@ -853,6 +853,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
@Override
public void acknowledge() {
try {
if (ListenerConsumer.this.autoCommit) {
throw new IllegalStateException("Manual acks are not allowed when auto commit is used");
}
ListenerConsumer.this.acks.put(this.record);
}
catch (InterruptedException e) {

View File

@@ -130,6 +130,14 @@ public class ConcurrentMessageListenerContainerTests {
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(listenerThreadNames).allMatch(threadName -> threadName.contains("-consumer-"));
@SuppressWarnings("unchecked")
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
"containers", List.class);
assertThat(containers.size()).isEqualTo(2);
for (int i = 0; i < 2; i++) {
assertThat(KafkaTestUtils.getPropertyValue(containers.get(i), "listenerConsumer.acks", Collection.class)
.size()).isEqualTo(0);
}
container.stop();
this.logger.info("Stop auto");
}