GH-140: Commit in the Beginning of the Main Loop

Fixes GH-140 (https://github.com/spring-projects/spring-kafka/issues/140)

Previously with the pretty big `containerProperties.getPollTimeout()` we ended up with the issue of not acked commits for `BATCH` mode.
Just because the logic relies on the `consumer.wakeup()` which causes `WakeupException` breaking the main polling loop.

* Move `processCommits()` function to the beginning of main loop, before blocking on the `this.consumer.poll()`
* Increase `EnableKafkaIntegrationTests` wait timeouts

**Cherry-pick to 1.0.x**

Add `firstBatchLatch` to test to verify that the first batch is commit within the time less than 10 sec for poll
This commit is contained in:
Artem Bilan
2016-07-11 10:49:27 -04:00
committed by Gary Russell
parent cc7df884e2
commit d04b53a7ab
3 changed files with 74 additions and 11 deletions

View File

@@ -403,6 +403,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
long lastAlertAt = lastReceive;
while (isRunning()) {
try {
if (!this.autoCommit) {
processCommits();
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Polling (paused=" + this.paused + ")...");
}
@@ -444,9 +447,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
}
this.unsent = checkPause(this.unsent);
if (!this.autoCommit) {
processCommits();
}
}
catch (WakeupException e) {
this.unsent = checkPause(this.unsent);

View File

@@ -117,22 +117,22 @@ public class EnableKafkaIntegrationTests {
template.send("annotated2", 0, 123, "foo");
template.flush();
assertThat(this.listener.latch2.await(20, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.key).isEqualTo(123);
assertThat(this.listener.partition).isNotNull();
assertThat(this.listener.topic).isEqualTo("annotated2");
template.send("annotated3", 0, "foo");
template.flush();
assertThat(this.listener.latch3.await(20, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.latch3.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.record.value()).isEqualTo("foo");
template.send("annotated4", 0, "foo");
template.flush();
assertThat(this.listener.latch4.await(20, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.latch4.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.record.value()).isEqualTo("foo");
assertThat(this.listener.ack).isNotNull();
assertThat(this.listener.eventLatch.await(20, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.eventLatch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.event.getListenerId().startsWith("qux-"));
MessageListenerContainer manualContainer = this.registry.getListenerContainer("qux");
assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener"))
@@ -153,11 +153,11 @@ public class EnableKafkaIntegrationTests {
template.send("annotated6", 0, 0, "baz");
template.send("annotated6", 1, 0, "qux");
template.flush();
assertThat(this.listener.latch5.await(20, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.latch5.await(60, TimeUnit.SECONDS)).isTrue();
template.send("annotated11", 0, "foo");
template.flush();
assertThat(this.listener.latch7.await(20, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.latch7.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.recordFilter.called).isTrue();
}
@@ -208,7 +208,7 @@ public class EnableKafkaIntegrationTests {
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
.build());
assertThat(this.listener.latch6.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.latch6.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.foo.getBar()).isEqualTo("bar");
}

View File

@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -83,9 +84,11 @@ public class KafkaMessageListenerContainerTests {
private static String topic6 = "testTopic6";
private static String topic7 = "testTopic7";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
topic6);
topic6, topic7);
@Rule
public TestName testName = new TestName();
@@ -494,6 +497,66 @@ public class KafkaMessageListenerContainerTests {
logger.info("Stop record ack");
}
@Test
public void testBatchAck() throws Exception {
logger.info("Start batch ack");
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic7);
template.sendDefault(0, 0, "foo");
template.sendDefault(0, 0, "baz");
template.sendDefault(1, 0, "bar");
template.sendDefault(1, 0, "qux");
template.flush();
Map<String, Object> props = KafkaTestUtils.consumerProps("test6", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic7);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
logger.info("batch ack: " + message);
});
containerProps.setSyncCommits(true);
containerProps.setAckMode(AckMode.BATCH);
containerProps.setPollTimeout(10000);
containerProps.setAckOnError(false);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProps);
container.setBeanName("testBatchAcks");
container.start();
Consumer<?, ?> containerConsumer = spyOnConsumer(container);
final CountDownLatch firstBatchLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(2);
willAnswer(invocation -> {
@SuppressWarnings({ "unchecked" })
Map<TopicPartition, OffsetAndMetadata> map = (Map<TopicPartition, OffsetAndMetadata>) invocation
.getArguments()[0];
for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
if (entry.getValue().offset() == 2) {
firstBatchLatch.countDown();
latch.countDown();
}
}
return invocation.callRealMethod();
}).given(containerConsumer)
.commitSync(any());
assertThat(firstBatchLatch.await(9, TimeUnit.SECONDS)).isTrue();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
Consumer<Integer, String> consumer = cf.createConsumer();
consumer.assign(Arrays.asList(new TopicPartition(topic7, 0), new TopicPartition(topic7, 1)));
assertThat(consumer.position(new TopicPartition(topic7, 0))).isEqualTo(2);
assertThat(consumer.position(new TopicPartition(topic7, 1))).isEqualTo(2);
container.stop();
consumer.close();
logger.info("Stop batch ack");
}
private RetryTemplate buildRetry() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(3, Collections.singletonMap(FooEx.class, true));
RetryTemplate retryTemplate = new RetryTemplate();