From 15102ddb5eeb18cc5c874a206f33b93d358aa7bc Mon Sep 17 00:00:00 2001 From: Martin Dam Date: Wed, 2 Nov 2016 21:28:31 -0400 Subject: [PATCH] GH-199: Do not wakeup consumer after queueing ack Fixes GH-199 (https://github.com/spring-projects/spring-kafka/issues/199) Waking up the consumer too often will trigger a rebalance and messages are not actually committed (cherry picked from commit c245cbb) --- .../kafka/listener/KafkaMessageListenerContainer.java | 9 --------- .../listener/KafkaMessageListenerContainerTests.java | 2 +- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index fcd4ec16..b28ce83a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -603,9 +603,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener if (!this.isAnyManualAck && !this.autoCommit) { this.acks.add(record); } - if (this.isRecordAck) { - this.consumer.wakeup(); - } } catch (Exception e) { if (this.containerProperties.isAckOnError() && !this.autoCommit) { @@ -619,9 +616,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } } } - if (this.isManualAck || this.isBatchAck) { - this.consumer.wakeup(); - } } private void processCommits() { @@ -792,9 +786,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener ListenerConsumer.this.logger.debug("Interrupt ignored"); } } - if (!ListenerConsumer.this.isManualImmediateAck && this.active) { - ListenerConsumer.this.consumer.wakeup(); - } } } finally { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 67d47836..c095d687 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -525,7 +525,7 @@ public class KafkaMessageListenerContainerTests { }); containerProps.setSyncCommits(true); containerProps.setAckMode(AckMode.BATCH); - containerProps.setPollTimeout(10000); + containerProps.setPollTimeout(100); containerProps.setAckOnError(false); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf,