diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index c54af872..1dd53fa8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -403,6 +403,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener long lastAlertAt = lastReceive; while (isRunning()) { try { + if (!this.autoCommit) { + processCommits(); + } if (this.logger.isTraceEnabled()) { this.logger.trace("Polling (paused=" + this.paused + ")..."); } @@ -444,9 +447,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } } this.unsent = checkPause(this.unsent); - if (!this.autoCommit) { - processCommits(); - } } catch (WakeupException e) { this.unsent = checkPause(this.unsent); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index e732ea81..ac8cce1b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -117,22 +117,22 @@ public class EnableKafkaIntegrationTests { template.send("annotated2", 0, 123, "foo"); template.flush(); - assertThat(this.listener.latch2.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.latch2.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.key).isEqualTo(123); assertThat(this.listener.partition).isNotNull(); assertThat(this.listener.topic).isEqualTo("annotated2"); template.send("annotated3", 0, "foo"); template.flush(); - assertThat(this.listener.latch3.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.latch3.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.record.value()).isEqualTo("foo"); template.send("annotated4", 0, "foo"); template.flush(); - assertThat(this.listener.latch4.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.latch4.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.record.value()).isEqualTo("foo"); assertThat(this.listener.ack).isNotNull(); - assertThat(this.listener.eventLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.eventLatch.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.event.getListenerId().startsWith("qux-")); MessageListenerContainer manualContainer = this.registry.getListenerContainer("qux"); assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener")) @@ -153,11 +153,11 @@ public class EnableKafkaIntegrationTests { template.send("annotated6", 0, 0, "baz"); template.send("annotated6", 1, 0, "qux"); template.flush(); - assertThat(this.listener.latch5.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.latch5.await(60, TimeUnit.SECONDS)).isTrue(); template.send("annotated11", 0, "foo"); template.flush(); - assertThat(this.listener.latch7.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.latch7.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.recordFilter.called).isTrue(); } @@ -208,7 +208,7 @@ public class EnableKafkaIntegrationTests { .setHeader(KafkaHeaders.PARTITION_ID, 0) .setHeader(KafkaHeaders.MESSAGE_KEY, 2) .build()); - assertThat(this.listener.latch6.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.latch6.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.foo.getBar()).isEqualTo("bar"); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index f05c2ce9..d9471b44 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -83,9 +84,11 @@ public class KafkaMessageListenerContainerTests { private static String topic6 = "testTopic6"; + private static String topic7 = "testTopic7"; + @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5, - topic6); + topic6, topic7); @Rule public TestName testName = new TestName(); @@ -494,6 +497,66 @@ public class KafkaMessageListenerContainerTests { logger.info("Stop record ack"); } + @Test + public void testBatchAck() throws Exception { + logger.info("Start batch ack"); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic7); + template.sendDefault(0, 0, "foo"); + template.sendDefault(0, 0, "baz"); + template.sendDefault(1, 0, "bar"); + template.sendDefault(1, 0, "qux"); + template.flush(); + + Map props = KafkaTestUtils.consumerProps("test6", "false", embeddedKafka); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic7); + containerProps.setMessageListener((MessageListener) message -> { + logger.info("batch ack: " + message); + }); + containerProps.setSyncCommits(true); + containerProps.setAckMode(AckMode.BATCH); + containerProps.setPollTimeout(10000); + containerProps.setAckOnError(false); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, + containerProps); + container.setBeanName("testBatchAcks"); + container.start(); + Consumer containerConsumer = spyOnConsumer(container); + final CountDownLatch firstBatchLatch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(2); + willAnswer(invocation -> { + + @SuppressWarnings({ "unchecked" }) + Map map = (Map) invocation + .getArguments()[0]; + for (Entry entry : map.entrySet()) { + if (entry.getValue().offset() == 2) { + firstBatchLatch.countDown(); + latch.countDown(); + } + } + return invocation.callRealMethod(); + + }).given(containerConsumer) + .commitSync(any()); + + assertThat(firstBatchLatch.await(9, TimeUnit.SECONDS)).isTrue(); + + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + Consumer consumer = cf.createConsumer(); + consumer.assign(Arrays.asList(new TopicPartition(topic7, 0), new TopicPartition(topic7, 1))); + assertThat(consumer.position(new TopicPartition(topic7, 0))).isEqualTo(2); + assertThat(consumer.position(new TopicPartition(topic7, 1))).isEqualTo(2); + container.stop(); + consumer.close(); + logger.info("Stop batch ack"); + } + private RetryTemplate buildRetry() { SimpleRetryPolicy policy = new SimpleRetryPolicy(3, Collections.singletonMap(FooEx.class, true)); RetryTemplate retryTemplate = new RetryTemplate();