GH-108: Stop the Invoker in the Middle of a Batch

- Fixes #110, committing the initial state of a consumer

Polishing according PR comments

Don't Interrupt invoker unless he doesn't stop

Logging and Clear Unsent
This commit is contained in:
Marius Bogoevici
2016-06-01 18:12:17 -04:00
committed by Gary Russell
parent e7240e207d
commit 67ffc2e331
5 changed files with 115 additions and 11 deletions

View File

@@ -32,4 +32,5 @@ public interface ConsumerFactory<K, V> {
boolean isAutoCommit();
}

View File

@@ -27,6 +27,7 @@ import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -247,6 +248,8 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private volatile Collection<TopicPartition> definedPartitions;
private ConsumerRecords<K, V> unsent;
private volatile Collection<TopicPartition> assignedPartitions;
private int count;
@@ -268,7 +271,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
long recentOffset) {
Assert.state(!this.isAnyManualAck || !this.autoCommit,
"Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@@ -284,6 +287,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
if (ListenerConsumer.this.listenerInvokerFuture != null) {
stopInvokerAndCommitManualAcks();
ListenerConsumer.this.recordsToProcess.clear();
ListenerConsumer.this.unsent = null;
}
else {
if (!CollectionUtils.isEmpty(partitions)) {
@@ -305,6 +309,21 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
ListenerConsumer.this.assignedPartitions = partitions;
if (!ListenerConsumer.this.autoCommit) {
// Commit initial positions - while this is generally redundant
// it protects us from the case when another consumer starts
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : partitions) {
offsets.put(partition, new OffsetAndMetadata(consumer.position(partition)));
}
if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
ListenerConsumer.this.consumer.commitSync(offsets);
}
else {
ListenerConsumer.this.consumer.commitAsync(offsets,
KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
}
}
// We will not start the invoker thread if we are in autocommit mode,
// as we will execute synchronously then
// We will not start the invoker thread if the container is stopped
@@ -363,7 +382,6 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
startInvoker();
}
}
ConsumerRecords<K, V> unsent = null;
long lastReceive = System.currentTimeMillis();
long lastAlertAt = lastReceive;
while (isRunning()) {
@@ -394,7 +412,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
this.consumer.pause(this.assignedPartitions
.toArray(new TopicPartition[this.assignedPartitions.size()]));
this.paused = true;
unsent = records;
this.unsent = records;
}
}
}
@@ -409,13 +427,13 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
}
}
unsent = checkPause(unsent);
this.unsent = checkPause(this.unsent);
if (!this.paused && !this.autoCommit) {
processCommits();
}
}
catch (WakeupException e) {
unsent = checkPause(unsent);
this.unsent = checkPause(this.unsent);
}
catch (Exception e) {
if (this.containerProperties.getErrorHandler() != null) {
@@ -449,9 +467,11 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
private void stopInvokerAndCommitManualAcks() {
long now = System.currentTimeMillis();
this.invoker.stop();
long remaining = this.containerProperties.getShutdownTimeout() + now - System.currentTimeMillis();
try {
this.listenerInvokerFuture.get(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
this.listenerInvokerFuture.get(remaining, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -552,7 +572,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private void invokeListener(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) {
final ConsumerRecord<K, V> record = iterator.next();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Processing " + record);
@@ -696,6 +716,8 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private final class ListenerInvoker implements SchedulingAwareRunnable {
private final CountDownLatch exitLatch = new CountDownLatch(1);
private volatile boolean active = true;
private volatile Thread executingThread;
@@ -728,14 +750,14 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
ListenerConsumer.this.logger.debug("Interrupt ignored");
}
}
if (!ListenerConsumer.this.isManualImmediateAck) {
if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
ListenerConsumer.this.consumer.wakeup();
}
}
}
finally {
this.active = false;
this.executingThread = null;
this.exitLatch.countDown();
}
}
@@ -745,8 +767,25 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
private void stop() {
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Stopping invoker");
}
this.active = false;
this.executingThread.interrupt();
try {
if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS)
&& this.executingThread != null) {
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Interrupting invoker");
}
this.executingThread.interrupt();
}
}
catch (InterruptedException e) {
if (this.executingThread != null) {
this.executingThread.interrupt();
}
Thread.currentThread().interrupt();
}
}
}

View File

@@ -21,9 +21,12 @@ import static org.mockito.BDDMockito.given;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -584,4 +587,61 @@ public class ConcurrentMessageListenerContainerTests {
logger.info("Stop ack on error");
}
@Test
public void testRebalanceWithSlowConsumer() throws Exception {
this.logger.info("Start auto");
Map<String, Object> props = KafkaTestUtils.consumerProps("test101", "false", embeddedKafka);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20000");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic1);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
ConcurrentMessageListenerContainer<Integer, String> container2 =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(8);
final Set<String> listenerThreadNames = Collections.synchronizedSet(new HashSet<String>());
List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
listenerThreadNames.add(Thread.currentThread().getName());
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
// ignore
}
receivedMessages.add(message.value());
listenerThreadNames.add(Thread.currentThread().getName());
latch.countDown();
});
container.setConcurrency(1);
container2.setConcurrency(1);
container.setBeanName("testAuto");
container2.setBeanName("testAuto2");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.sendDefault(0, 0, "foo");
template.sendDefault(0, 2, "bar");
template.sendDefault(0, 0, "baz");
template.sendDefault(0, 2, "qux");
template.sendDefault(1, 2, "corge");
template.sendDefault(1, 2, "grault");
template.sendDefault(1, 2, "garply");
template.sendDefault(1, 2, "waldo");
template.flush();
container2.start();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(receivedMessages).containsOnlyOnce("foo", "bar", "baz", "qux", "corge", "grault", "garply", "waldo");
// all messages are received
assertThat(receivedMessages).hasSize(8);
// messages are received on separate threads
assertThat(listenerThreadNames.size()).isGreaterThanOrEqualTo(2);
container.stop();
container2.stop();
this.logger.info("Stop auto");
}
}

View File

@@ -3,7 +3,6 @@ log4j.rootCategory=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n
log4j.category.org.springframework.kafka=WARN
log4j.category.org.apache.kafka.clients=WARN
log4j.category.org.apache.kafka.common.network.Selector=ERROR

View File

@@ -164,6 +164,11 @@
<property name="illegalPattern" value="true" />
<property name="message" value="Trailing whitespace" />
</module>
<module name="Regexp">
<property name="format" value="System.(out|err).print" />
<property name="illegalPattern" value="true" />
<property name="message" value="System.out or .err" />
</module>
<!-- Whitespace -->
<module name="GenericWhitespace" />