GH-135: Fix Acks
Resolves #135 Properly transfer `ack`s from the listener thread to the consumer thread for all ack modes. Previously, record ack mode was not handled in `processCommits`. Fix test according PR comments
This commit is contained in:
committed by
Artem Bilan
parent
cce7ae9dc5
commit
677d135932
@@ -541,19 +541,20 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
}
|
||||
|
||||
/**
|
||||
* Process any manual acks that have been queued by the listener thread.
|
||||
* Process any acks that have been queued by the listener thread.
|
||||
*/
|
||||
private void handleManualAcks() {
|
||||
if (ListenerConsumer.this.isAnyManualAck) {
|
||||
ConsumerRecord<K, V> record = this.acks.poll();
|
||||
while (record != null) {
|
||||
manualAck(record);
|
||||
record = this.acks.poll();
|
||||
private void handleAcks() {
|
||||
ConsumerRecord<K, V> record = this.acks.poll();
|
||||
while (record != null) {
|
||||
if (this.logger.isTraceEnabled()) {
|
||||
this.logger.trace("Ack: " + record);
|
||||
}
|
||||
processAck(record);
|
||||
record = this.acks.poll();
|
||||
}
|
||||
}
|
||||
|
||||
private void manualAck(ConsumerRecord<K, V> record) {
|
||||
private void processAck(ConsumerRecord<K, V> record) {
|
||||
if (ListenerConsumer.this.isManualImmediateAck) {
|
||||
try {
|
||||
ackImmediate(record);
|
||||
@@ -621,7 +622,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
}
|
||||
|
||||
private void processCommits() {
|
||||
handleManualAcks();
|
||||
handleAcks();
|
||||
this.count += this.acks.size();
|
||||
long now;
|
||||
AckMode ackMode = this.containerProperties.getAckMode();
|
||||
@@ -630,7 +631,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
updatePendingOffsets();
|
||||
}
|
||||
boolean countExceeded = this.count >= this.containerProperties.getAckCount();
|
||||
if (this.isManualAck || this.isBatchAck
|
||||
if (this.isManualAck || this.isBatchAck || this.isRecordAck
|
||||
|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {
|
||||
if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
|
||||
this.logger.debug("Committing in AckMode.COUNT because count " + this.count
|
||||
|
||||
@@ -609,6 +609,7 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
// processed 'qux'
|
||||
// it has been updated even 'baz' failed
|
||||
assertThat(consumer.position(new TopicPartition(topic9, 1))).isEqualTo(2);
|
||||
consumer.close();
|
||||
logger.info("Stop ack on error");
|
||||
}
|
||||
|
||||
|
||||
@@ -24,10 +24,12 @@ import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@@ -36,6 +38,7 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
@@ -78,8 +81,11 @@ public class KafkaMessageListenerContainerTests {
|
||||
|
||||
private static String topic5 = "testTopic5";
|
||||
|
||||
private static String topic6 = "testTopic6";
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5);
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
|
||||
topic6);
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
@@ -436,6 +442,58 @@ public class KafkaMessageListenerContainerTests {
|
||||
logger.info("Stop " + this.testName.getMethodName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordAck() throws Exception {
|
||||
logger.info("Start record ack");
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("test6", "false", embeddedKafka);
|
||||
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
ContainerProperties containerProps = new ContainerProperties(topic6);
|
||||
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
logger.info("record ack: " + message);
|
||||
});
|
||||
containerProps.setSyncCommits(true);
|
||||
containerProps.setAckMode(AckMode.RECORD);
|
||||
containerProps.setAckOnError(false);
|
||||
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
|
||||
containerProps);
|
||||
container.setBeanName("testRecordAcks");
|
||||
container.start();
|
||||
Consumer<?, ?> containerConsumer = spyOnConsumer(container);
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
willAnswer(invocation -> {
|
||||
|
||||
@SuppressWarnings({ "unchecked" })
|
||||
Map<TopicPartition, OffsetAndMetadata> map = (Map<TopicPartition, OffsetAndMetadata>) invocation
|
||||
.getArguments()[0];
|
||||
for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
|
||||
if (entry.getValue().offset() == 2) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
return invocation.callRealMethod();
|
||||
|
||||
}).given(containerConsumer)
|
||||
.commitSync(any());
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
|
||||
template.setDefaultTopic(topic6);
|
||||
template.sendDefault(0, 0, "foo");
|
||||
template.sendDefault(1, 0, "bar");
|
||||
template.sendDefault(0, 0, "baz");
|
||||
template.sendDefault(1, 0, "qux");
|
||||
template.flush();
|
||||
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
Consumer<Integer, String> consumer = cf.createConsumer();
|
||||
consumer.assign(Arrays.asList(new TopicPartition(topic6, 0), new TopicPartition(topic6, 1)));
|
||||
assertThat(consumer.position(new TopicPartition(topic6, 0))).isEqualTo(2);
|
||||
assertThat(consumer.position(new TopicPartition(topic6, 1))).isEqualTo(2);
|
||||
container.stop();
|
||||
consumer.close();
|
||||
logger.info("Stop record ack");
|
||||
}
|
||||
|
||||
private RetryTemplate buildRetry() {
|
||||
SimpleRetryPolicy policy = new SimpleRetryPolicy(3, Collections.singletonMap(FooEx.class, true));
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
|
||||
Reference in New Issue
Block a user