GH-117: Move the placement of manual ack handling

Fixes GH-117 (https://github.com/spring-projects/spring-kafka/issues/117)

- merge `handleManualAcks`
- simplify offset handling by removing the `manualOffsets` map
- ensure that all acks are flushed on `stop()`
- Upgrade to Gradle 2.14
This commit is contained in:
Marius Bogoevici
2016-06-17 18:12:53 -04:00
committed by Artem Bilan
parent 0e164d8678
commit be2e6ce984
4 changed files with 100 additions and 45 deletions

Binary file not shown.

View File

@@ -1,6 +1,6 @@
#Mon May 16 18:24:00 EDT 2016 #Mon Jun 20 11:56:44 EDT 2016
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip

View File

@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@@ -219,8 +217,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private final Consumer<K, V> consumer; private final Consumer<K, V> consumer;
private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> manualOffsets = new ConcurrentHashMap<>();
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>(); private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
private final MessageListener<K, V> listener; private final MessageListener<K, V> listener;
@@ -309,12 +305,17 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
ListenerConsumer.this.assignedPartitions = partitions; ListenerConsumer.this.assignedPartitions = partitions;
if (!ListenerConsumer.this.autoCommit) { if (!ListenerConsumer.this.autoCommit) {
// Commit initial positions - while this is generally redundant // Commit initial positions - this is generally redundant but
// it protects us from the case when another consumer starts // it protects us from the case when another consumer starts
// and rebalance would cause it to reset at the end
// see https://github.com/spring-projects/spring-kafka/issues/110
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : partitions) { for (TopicPartition partition : partitions) {
offsets.put(partition, new OffsetAndMetadata(consumer.position(partition))); offsets.put(partition, new OffsetAndMetadata(consumer.position(partition)));
} }
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Committing: " + offsets);
}
if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) { if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
ListenerConsumer.this.consumer.commitSync(offsets); ListenerConsumer.this.consumer.commitSync(offsets);
} }
@@ -398,7 +399,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
if (this.containerProperties.getIdleEventInterval() != null) { if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis(); lastReceive = System.currentTimeMillis();
} }
handleManualAcks();
// if the container is set to auto-commit, then execute in the // if the container is set to auto-commit, then execute in the
// same thread // same thread
// otherwise send to the buffering queue // otherwise send to the buffering queue
@@ -487,8 +487,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
finally { finally {
this.listenerInvokerFuture = null; this.listenerInvokerFuture = null;
} }
// handle the last manual acks, after the listeners have closed
handleManualAcks();
processCommits(); processCommits();
if (this.offsets.size() > 0) { if (this.offsets.size() > 0) {
// we always commit after stopping the invoker // we always commit after stopping the invoker
@@ -551,7 +549,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
} }
} }
else { else {
updateManualOffset(record); addOffset(record);
} }
} }
@@ -605,6 +603,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
} }
private void processCommits() { private void processCommits() {
handleManualAcks();
this.count += this.acks.size(); this.count += this.acks.size();
long now; long now;
AckMode ackMode = this.containerProperties.getAckMode(); AckMode ackMode = this.containerProperties.getAckMode();
@@ -614,6 +613,10 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
} }
boolean countExceeded = this.count >= this.containerProperties.getAckCount(); boolean countExceeded = this.count >= this.containerProperties.getAckCount();
if (ackMode.equals(AckMode.BATCH) || ackMode.equals(AckMode.COUNT) && countExceeded) { if (ackMode.equals(AckMode.BATCH) || ackMode.equals(AckMode.COUNT) && countExceeded) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing in AckMode.COUNT because count " + this.count
+ " exceeds configured limit of" + this.containerProperties.getAckCount());
}
commitIfNecessary(); commitIfNecessary();
this.count = 0; this.count = 0;
} }
@@ -621,11 +624,27 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
now = System.currentTimeMillis(); now = System.currentTimeMillis();
boolean elapsed = now - this.last > this.containerProperties.getAckTime(); boolean elapsed = now - this.last > this.containerProperties.getAckTime();
if (ackMode.equals(AckMode.TIME) && elapsed) { if (ackMode.equals(AckMode.TIME) && elapsed) {
if (this.logger.isDebugEnabled()) {
this.logger
.debug("Committing in AckMode.TIME because time elapsed exceeds configured limit of "
+ this.containerProperties.getAckTime());
}
commitIfNecessary(); commitIfNecessary();
this.last = now; this.last = now;
} }
else if ((ackMode.equals(AckMode.COUNT_TIME) || this.isManualAck) else if ((ackMode.equals(AckMode.COUNT_TIME) || this.isManualAck) && (elapsed || countExceeded)) {
&& (elapsed || countExceeded)) { if (this.logger.isDebugEnabled()) {
if (elapsed) {
this.logger.debug("Committing in AckMode." + ackMode.name() + " because time elapsed "
+ "exceeds configured limit of " + this.containerProperties.getAckTime());
}
else {
this.logger.debug("Committing in AckMode." + ackMode.name() + " because count "
+ this.count + " exceeds configured limit of"
+ this.containerProperties.getAckCount());
}
}
commitIfNecessary(); commitIfNecessary();
this.last = now; this.last = now;
this.count = 0; this.count = 0;
@@ -674,39 +693,22 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
this.offsets.get(record.topic()).put(record.partition(), record.offset()); this.offsets.get(record.topic()).put(record.partition(), record.offset());
} }
private void updateManualOffset(ConsumerRecord<K, V> record) {
if (!this.manualOffsets.containsKey(record.topic())) {
this.manualOffsets.putIfAbsent(record.topic(), new ConcurrentHashMap<Integer, Long>());
}
this.manualOffsets.get(record.topic()).put(record.partition(), record.offset());
}
private void commitIfNecessary() { private void commitIfNecessary() {
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>(); Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
if (this.isManualAck) { for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
for (Entry<String, ConcurrentMap<Integer, Long>> entry : this.manualOffsets.entrySet()) { for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
Iterator<Entry<Integer, Long>> iterator = entry.getValue().entrySet().iterator(); commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
while (iterator.hasNext()) { new OffsetAndMetadata(offset.getValue() + 1));
Entry<Integer, Long> offset = iterator.next();
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
new OffsetAndMetadata(offset.getValue() + 1));
iterator.remove();
}
}
}
else {
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
new OffsetAndMetadata(offset.getValue() + 1));
}
} }
} }
this.offsets.clear(); this.offsets.clear();
if (this.logger.isDebugEnabled()) { if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing: " + commits); this.logger.debug("Commit list: " + commits);
} }
if (!commits.isEmpty()) { if (!commits.isEmpty()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing: " + commits);
}
try { try {
if (this.containerProperties.isSyncCommits()) { if (this.containerProperties.isSyncCommits()) {
this.consumer.commitSync(commits); this.consumer.commitSync(commits);
@@ -717,6 +719,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
} }
catch (WakeupException e) { catch (WakeupException e) {
// ignore - not polling // ignore - not polling
if (this.logger.isDebugEnabled()) {
this.logger.debug("Woken up during commit");
}
} }
} }
} }
@@ -793,6 +798,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
} }
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Invoker stopped");
}
} }
} }

View File

@@ -18,9 +18,10 @@ package org.springframework.kafka.listener;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import java.util.BitSet; import java.util.BitSet;
@@ -207,7 +208,7 @@ public class KafkaMessageListenerContainerTests {
ContainerProperties containerProps = new ContainerProperties(topic5); ContainerProperties containerProps = new ContainerProperties(topic5);
containerProps.setAckCount(1); containerProps.setAckCount(1);
containerProps.setPauseAfter(100); containerProps.setPauseAfter(100);
containerProps.setAckMode(AckMode.MANUAL); containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC);
KafkaMessageListenerContainer<Integer, String> container = KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps); new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(3); final CountDownLatch latch = new CountDownLatch(3);
@@ -232,22 +233,68 @@ public class KafkaMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps); ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf); KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic5); template.setDefaultTopic(topic5);
template.sendDefault(0, "foo"); template.sendDefault(0, 0, "foo");
template.sendDefault(2, "bar"); template.sendDefault(1, 2, "bar");
template.flush(); template.flush();
Thread.sleep(300); Thread.sleep(300);
template.sendDefault(0, "fiz"); template.sendDefault(0, 0, "fiz");
template.sendDefault(2, "buz"); template.sendDefault(1, 2, "buz");
template.flush(); template.flush();
// Verify that commitSync is called when paused // Verify that commitSync is called when paused
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
verify(consumer, atLeastOnce()).pause(any(TopicPartition.class), any(TopicPartition.class)); verify(consumer, atLeastOnce()).pause(any(TopicPartition.class), any(TopicPartition.class));
verify(consumer, atMost(2)).commitSync(any()); verify(consumer, atLeast(2)).commitSync(any());
verify(consumer, atLeastOnce()).resume(any(TopicPartition.class), any(TopicPartition.class)); verify(consumer, atLeastOnce()).resume(any(TopicPartition.class), any(TopicPartition.class));
container.stop(); container.stop();
} }
@Test
public void testCommitsAreFlushedOnStop() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("flushedOnStop", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic5);
containerProps.setAckCount(1);
containerProps.setPauseAfter(100);
// set large values, ensuring that commits don't happen before `stop()`
containerProps.setAckTime(20000);
containerProps.setAckCount(20000);
containerProps.setAckMode(AckMode.COUNT_TIME);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(4);
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
logger.info("slow: " + message);
ack.acknowledge();
latch.countDown();
});
container.setBeanName("testManualFlushed");
container.start();
Consumer<?, ?> consumer = spyOnConsumer(container);
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic5);
template.sendDefault(0, 0, "foo");
template.sendDefault(1, 2, "bar");
template.flush();
Thread.sleep(300);
template.sendDefault(0, 0, "fiz");
template.sendDefault(1, 2, "buz");
template.flush();
// Verify that commitSync is called when paused
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
// Verify that just the initial commit is processed before stop
verify(consumer, times(1)).commitSync(any());
container.stop();
// Verify that a commit has been made on stop
verify(consumer, times(2)).commitSync(any());
}
@Test @Test
public void testSlowConsumerWithException() throws Exception { public void testSlowConsumerWithException() throws Exception {
logger.info("Start " + this.testName.getMethodName()); logger.info("Start " + this.testName.getMethodName());