GH-199: Do not wakeup consumer after queueing ack

Fixes GH-199 (https://github.com/spring-projects/spring-kafka/issues/199)

Waking up the consumer too often will trigger a rebalance and messages are not actually committed
This commit is contained in:
Martin Dam
2016-11-02 18:28:31 -07:00
committed by Artem Bilan
parent 79fceba98b
commit c245cbbe89
2 changed files with 4 additions and 13 deletions

View File

@@ -772,9 +772,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
if (!this.isAnyManualAck && !this.autoCommit) {
this.acks.add(record);
}
if (this.isRecordAck) {
this.consumer.wakeup();
}
}
catch (Exception e) {
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
@@ -792,9 +789,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
}
}
if (this.isManualAck || this.isBatchAck) {
this.consumer.wakeup();
}
}
private void processCommits() {
@@ -995,9 +989,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
ListenerConsumer.this.logger.debug("Interrupt ignored");
}
}
if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
ListenerConsumer.this.consumer.wakeup();
}
}
}
finally {

View File

@@ -571,7 +571,7 @@ public class KafkaMessageListenerContainerTests {
});
containerProps.setSyncCommits(true);
containerProps.setAckMode(AckMode.BATCH);
containerProps.setPollTimeout(10000);
containerProps.setPollTimeout(100);
containerProps.setAckOnError(false);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
@@ -640,7 +640,7 @@ public class KafkaMessageListenerContainerTests {
});
containerProps.setSyncCommits(true);
containerProps.setAckMode(AckMode.BATCH);
containerProps.setPollTimeout(10000);
containerProps.setPollTimeout(100);
containerProps.setAckOnError(false);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
@@ -714,7 +714,7 @@ public class KafkaMessageListenerContainerTests {
});
containerProps.setSyncCommits(true);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProps.setPollTimeout(10000);
containerProps.setPollTimeout(100);
containerProps.setAckOnError(false);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
@@ -777,7 +777,7 @@ public class KafkaMessageListenerContainerTests {
});
containerProps.setSyncCommits(true);
containerProps.setAckMode(AckMode.BATCH);
containerProps.setPollTimeout(10000);
containerProps.setPollTimeout(100);
containerProps.setAckOnError(true);
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setGenericErrorHandler((BatchErrorHandler) (t, messages) -> {