diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 98515b62..8faa00b3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -541,19 +541,20 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } /** - * Process any manual acks that have been queued by the listener thread. + * Process any acks that have been queued by the listener thread. */ - private void handleManualAcks() { - if (ListenerConsumer.this.isAnyManualAck) { - ConsumerRecord record = this.acks.poll(); - while (record != null) { - manualAck(record); - record = this.acks.poll(); + private void handleAcks() { + ConsumerRecord record = this.acks.poll(); + while (record != null) { + if (this.logger.isTraceEnabled()) { + this.logger.trace("Ack: " + record); } + processAck(record); + record = this.acks.poll(); } } - private void manualAck(ConsumerRecord record) { + private void processAck(ConsumerRecord record) { if (ListenerConsumer.this.isManualImmediateAck) { try { ackImmediate(record); @@ -621,7 +622,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } private void processCommits() { - handleManualAcks(); + handleAcks(); this.count += this.acks.size(); long now; AckMode ackMode = this.containerProperties.getAckMode(); @@ -630,7 +631,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener updatePendingOffsets(); } boolean countExceeded = this.count >= this.containerProperties.getAckCount(); - if (this.isManualAck || this.isBatchAck + if (this.isManualAck || this.isBatchAck || this.isRecordAck || (ackMode.equals(AckMode.COUNT) && countExceeded)) { if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) { this.logger.debug("Committing in AckMode.COUNT because count " + this.count diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 347e89eb..0be60b6b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -609,6 +609,7 @@ public class ConcurrentMessageListenerContainerTests { // processed 'qux' // it has been updated even 'baz' failed assertThat(consumer.position(new TopicPartition(topic9, 1))).isEqualTo(2); + consumer.close(); logger.info("Stop ack on error"); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 55959d89..f05c2ce9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -24,10 +24,12 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +38,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.ClassRule; import org.junit.Rule; @@ -78,8 +81,11 @@ public class KafkaMessageListenerContainerTests { private static String topic5 = "testTopic5"; + private static String topic6 = "testTopic6"; + @ClassRule - public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5); + public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5, + topic6); @Rule public TestName testName = new TestName(); @@ -436,6 +442,58 @@ public class KafkaMessageListenerContainerTests { logger.info("Stop " + this.testName.getMethodName()); } + @Test + public void testRecordAck() throws Exception { + logger.info("Start record ack"); + Map props = KafkaTestUtils.consumerProps("test6", "false", embeddedKafka); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic6); + containerProps.setMessageListener((MessageListener) message -> { + logger.info("record ack: " + message); + }); + containerProps.setSyncCommits(true); + containerProps.setAckMode(AckMode.RECORD); + containerProps.setAckOnError(false); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, + containerProps); + container.setBeanName("testRecordAcks"); + container.start(); + Consumer containerConsumer = spyOnConsumer(container); + final CountDownLatch latch = new CountDownLatch(2); + willAnswer(invocation -> { + + @SuppressWarnings({ "unchecked" }) + Map map = (Map) invocation + .getArguments()[0]; + for (Entry entry : map.entrySet()) { + if (entry.getValue().offset() == 2) { + latch.countDown(); + } + } + return invocation.callRealMethod(); + + }).given(containerConsumer) + .commitSync(any()); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic6); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 0, "bar"); + template.sendDefault(0, 0, "baz"); + template.sendDefault(1, 0, "qux"); + template.flush(); + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + Consumer consumer = cf.createConsumer(); + consumer.assign(Arrays.asList(new TopicPartition(topic6, 0), new TopicPartition(topic6, 1))); + assertThat(consumer.position(new TopicPartition(topic6, 0))).isEqualTo(2); + assertThat(consumer.position(new TopicPartition(topic6, 1))).isEqualTo(2); + container.stop(); + consumer.close(); + logger.info("Stop record ack"); + } + private RetryTemplate buildRetry() { SimpleRetryPolicy policy = new SimpleRetryPolicy(3, Collections.singletonMap(FooEx.class, true)); RetryTemplate retryTemplate = new RetryTemplate();