GH-135: Fix Acks

Resolves #135

Properly transfer `ack`s from the listener thread to the consumer thread
for all ack modes.

Previously, record ack mode was not handled in `processCommits`.

Fix test according PR comments
This commit is contained in:
Gary Russell
2016-07-05 10:32:54 -04:00
committed by Artem Bilan
parent cce7ae9dc5
commit 677d135932
3 changed files with 71 additions and 11 deletions

View File

@@ -541,19 +541,20 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
/**
* Process any manual acks that have been queued by the listener thread.
* Process any acks that have been queued by the listener thread.
*/
private void handleManualAcks() {
if (ListenerConsumer.this.isAnyManualAck) {
ConsumerRecord<K, V> record = this.acks.poll();
while (record != null) {
manualAck(record);
record = this.acks.poll();
private void handleAcks() {
ConsumerRecord<K, V> record = this.acks.poll();
while (record != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Ack: " + record);
}
processAck(record);
record = this.acks.poll();
}
}
private void manualAck(ConsumerRecord<K, V> record) {
private void processAck(ConsumerRecord<K, V> record) {
if (ListenerConsumer.this.isManualImmediateAck) {
try {
ackImmediate(record);
@@ -621,7 +622,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
private void processCommits() {
handleManualAcks();
handleAcks();
this.count += this.acks.size();
long now;
AckMode ackMode = this.containerProperties.getAckMode();
@@ -630,7 +631,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
updatePendingOffsets();
}
boolean countExceeded = this.count >= this.containerProperties.getAckCount();
if (this.isManualAck || this.isBatchAck
if (this.isManualAck || this.isBatchAck || this.isRecordAck
|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {
if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
this.logger.debug("Committing in AckMode.COUNT because count " + this.count

View File

@@ -609,6 +609,7 @@ public class ConcurrentMessageListenerContainerTests {
// processed 'qux'
// it has been updated even 'baz' failed
assertThat(consumer.position(new TopicPartition(topic9, 1))).isEqualTo(2);
consumer.close();
logger.info("Stop ack on error");
}

View File

@@ -24,10 +24,12 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +38,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -78,8 +81,11 @@ public class KafkaMessageListenerContainerTests {
private static String topic5 = "testTopic5";
private static String topic6 = "testTopic6";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5);
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
topic6);
@Rule
public TestName testName = new TestName();
@@ -436,6 +442,58 @@ public class KafkaMessageListenerContainerTests {
logger.info("Stop " + this.testName.getMethodName());
}
@Test
public void testRecordAck() throws Exception {
logger.info("Start record ack");
Map<String, Object> props = KafkaTestUtils.consumerProps("test6", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic6);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
logger.info("record ack: " + message);
});
containerProps.setSyncCommits(true);
containerProps.setAckMode(AckMode.RECORD);
containerProps.setAckOnError(false);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProps);
container.setBeanName("testRecordAcks");
container.start();
Consumer<?, ?> containerConsumer = spyOnConsumer(container);
final CountDownLatch latch = new CountDownLatch(2);
willAnswer(invocation -> {
@SuppressWarnings({ "unchecked" })
Map<TopicPartition, OffsetAndMetadata> map = (Map<TopicPartition, OffsetAndMetadata>) invocation
.getArguments()[0];
for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
if (entry.getValue().offset() == 2) {
latch.countDown();
}
}
return invocation.callRealMethod();
}).given(containerConsumer)
.commitSync(any());
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic6);
template.sendDefault(0, 0, "foo");
template.sendDefault(1, 0, "bar");
template.sendDefault(0, 0, "baz");
template.sendDefault(1, 0, "qux");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
Consumer<Integer, String> consumer = cf.createConsumer();
consumer.assign(Arrays.asList(new TopicPartition(topic6, 0), new TopicPartition(topic6, 1)));
assertThat(consumer.position(new TopicPartition(topic6, 0))).isEqualTo(2);
assertThat(consumer.position(new TopicPartition(topic6, 1))).isEqualTo(2);
container.stop();
consumer.close();
logger.info("Stop record ack");
}
private RetryTemplate buildRetry() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(3, Collections.singletonMap(FooEx.class, true));
RetryTemplate retryTemplate = new RetryTemplate();