diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java index 2c6d0f8e..e701e3e2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java @@ -32,4 +32,5 @@ public interface ConsumerFactory { boolean isAutoCommit(); + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 2580cf6b..b6477636 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -247,6 +248,8 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener private volatile Collection definedPartitions; + private ConsumerRecords unsent; + private volatile Collection assignedPartitions; private int count; @@ -268,7 +271,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener long recentOffset) { Assert.state(!this.isAnyManualAck || !this.autoCommit, "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode()); - Consumer consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(); + final Consumer consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(); ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { @@ -284,6 +287,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener if (ListenerConsumer.this.listenerInvokerFuture != null) { stopInvokerAndCommitManualAcks(); ListenerConsumer.this.recordsToProcess.clear(); + ListenerConsumer.this.unsent = null; } else { if (!CollectionUtils.isEmpty(partitions)) { @@ -305,6 +309,21 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener @Override public void onPartitionsAssigned(Collection partitions) { ListenerConsumer.this.assignedPartitions = partitions; + if (!ListenerConsumer.this.autoCommit) { + // Commit initial positions - while this is generally redundant + // it protects us from the case when another consumer starts + Map offsets = new HashMap<>(); + for (TopicPartition partition : partitions) { + offsets.put(partition, new OffsetAndMetadata(consumer.position(partition))); + } + if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) { + ListenerConsumer.this.consumer.commitSync(offsets); + } + else { + ListenerConsumer.this.consumer.commitAsync(offsets, + KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback()); + } + } // We will not start the invoker thread if we are in autocommit mode, // as we will execute synchronously then // We will not start the invoker thread if the container is stopped @@ -363,7 +382,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener startInvoker(); } } - ConsumerRecords unsent = null; long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; while (isRunning()) { @@ -394,7 +412,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener this.consumer.pause(this.assignedPartitions .toArray(new TopicPartition[this.assignedPartitions.size()])); this.paused = true; - unsent = records; + this.unsent = records; } } } @@ -409,13 +427,13 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } } } - unsent = checkPause(unsent); + this.unsent = checkPause(this.unsent); if (!this.paused && !this.autoCommit) { processCommits(); } } catch (WakeupException e) { - unsent = checkPause(unsent); + this.unsent = checkPause(this.unsent); } catch (Exception e) { if (this.containerProperties.getErrorHandler() != null) { @@ -449,9 +467,11 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } private void stopInvokerAndCommitManualAcks() { + long now = System.currentTimeMillis(); this.invoker.stop(); + long remaining = this.containerProperties.getShutdownTimeout() + now - System.currentTimeMillis(); try { - this.listenerInvokerFuture.get(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); + this.listenerInvokerFuture.get(remaining, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -552,7 +572,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener private void invokeListener(final ConsumerRecords records) { Iterator> iterator = records.iterator(); - while (iterator.hasNext()) { + while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) { final ConsumerRecord record = iterator.next(); if (this.logger.isTraceEnabled()) { this.logger.trace("Processing " + record); @@ -696,6 +716,8 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener private final class ListenerInvoker implements SchedulingAwareRunnable { + private final CountDownLatch exitLatch = new CountDownLatch(1); + private volatile boolean active = true; private volatile Thread executingThread; @@ -728,14 +750,14 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener ListenerConsumer.this.logger.debug("Interrupt ignored"); } } - if (!ListenerConsumer.this.isManualImmediateAck) { + if (!ListenerConsumer.this.isManualImmediateAck && this.active) { ListenerConsumer.this.consumer.wakeup(); } } } finally { this.active = false; - this.executingThread = null; + this.exitLatch.countDown(); } } @@ -745,8 +767,25 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } private void stop() { + if (ListenerConsumer.this.logger.isDebugEnabled()) { + ListenerConsumer.this.logger.debug("Stopping invoker"); + } this.active = false; - this.executingThread.interrupt(); + try { + if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS) + && this.executingThread != null) { + if (ListenerConsumer.this.logger.isDebugEnabled()) { + ListenerConsumer.this.logger.debug("Interrupting invoker"); + } + this.executingThread.interrupt(); + } + } + catch (InterruptedException e) { + if (this.executingThread != null) { + this.executingThread.interrupt(); + } + Thread.currentThread().interrupt(); + } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index d5bfe325..6c0c7e8c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -21,9 +21,12 @@ import static org.mockito.BDDMockito.given; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; +import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -584,4 +587,61 @@ public class ConcurrentMessageListenerContainerTests { logger.info("Stop ack on error"); } + @Test + public void testRebalanceWithSlowConsumer() throws Exception { + this.logger.info("Start auto"); + Map props = KafkaTestUtils.consumerProps("test101", "false", embeddedKafka); + props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20000"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic1); + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + ConcurrentMessageListenerContainer container2 = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(8); + final Set listenerThreadNames = Collections.synchronizedSet(new HashSet()); + List receivedMessages = Collections.synchronizedList(new ArrayList<>()); + containerProps.setMessageListener((MessageListener) message -> { + listenerThreadNames.add(Thread.currentThread().getName()); + try { + Thread.sleep(2000); + } + catch (InterruptedException e) { + // ignore + } + receivedMessages.add(message.value()); + listenerThreadNames.add(Thread.currentThread().getName()); + latch.countDown(); + }); + container.setConcurrency(1); + container2.setConcurrency(1); + container.setBeanName("testAuto"); + container2.setBeanName("testAuto2"); + container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic1); + template.sendDefault(0, 0, "foo"); + template.sendDefault(0, 2, "bar"); + template.sendDefault(0, 0, "baz"); + template.sendDefault(0, 2, "qux"); + template.sendDefault(1, 2, "corge"); + template.sendDefault(1, 2, "grault"); + template.sendDefault(1, 2, "garply"); + template.sendDefault(1, 2, "waldo"); + template.flush(); + container2.start(); + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(receivedMessages).containsOnlyOnce("foo", "bar", "baz", "qux", "corge", "grault", "garply", "waldo"); + // all messages are received + assertThat(receivedMessages).hasSize(8); + // messages are received on separate threads + assertThat(listenerThreadNames.size()).isGreaterThanOrEqualTo(2); + container.stop(); + container2.stop(); + this.logger.info("Stop auto"); + } + } diff --git a/spring-kafka/src/test/resources/log4j.properties b/spring-kafka/src/test/resources/log4j.properties index 61c1db58..489abf7e 100644 --- a/spring-kafka/src/test/resources/log4j.properties +++ b/spring-kafka/src/test/resources/log4j.properties @@ -3,7 +3,6 @@ log4j.rootCategory=WARN, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n - log4j.category.org.springframework.kafka=WARN log4j.category.org.apache.kafka.clients=WARN log4j.category.org.apache.kafka.common.network.Selector=ERROR diff --git a/src/checkstyle/checkstyle.xml b/src/checkstyle/checkstyle.xml index 0b6ccaa9..d07aef79 100644 --- a/src/checkstyle/checkstyle.xml +++ b/src/checkstyle/checkstyle.xml @@ -164,6 +164,11 @@ + + + + +