diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index ca78035e..d3b83982 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 323f0727..67157a31 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Mon May 16 18:24:00 EDT 2016 +#Mon Jun 20 11:56:44 EDT 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 5367794f..56922357 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -219,8 +217,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener private final Consumer consumer; - private final ConcurrentMap> manualOffsets = new ConcurrentHashMap<>(); - private final Map> offsets = new HashMap<>(); private final MessageListener listener; @@ -309,12 +305,17 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener public void onPartitionsAssigned(Collection partitions) { ListenerConsumer.this.assignedPartitions = partitions; if (!ListenerConsumer.this.autoCommit) { - // Commit initial positions - while this is generally redundant + // Commit initial positions - this is generally redundant but // it protects us from the case when another consumer starts + // and rebalance would cause it to reset at the end + // see https://github.com/spring-projects/spring-kafka/issues/110 Map offsets = new HashMap<>(); for (TopicPartition partition : partitions) { offsets.put(partition, new OffsetAndMetadata(consumer.position(partition))); } + if (ListenerConsumer.this.logger.isDebugEnabled()) { + ListenerConsumer.this.logger.debug("Committing: " + offsets); + } if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) { ListenerConsumer.this.consumer.commitSync(offsets); } @@ -398,7 +399,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } - handleManualAcks(); // if the container is set to auto-commit, then execute in the // same thread // otherwise send to the buffering queue @@ -487,8 +487,6 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener finally { this.listenerInvokerFuture = null; } - // handle the last manual acks, after the listeners have closed - handleManualAcks(); processCommits(); if (this.offsets.size() > 0) { // we always commit after stopping the invoker @@ -551,7 +549,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } } else { - updateManualOffset(record); + addOffset(record); } } @@ -605,6 +603,7 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } private void processCommits() { + handleManualAcks(); this.count += this.acks.size(); long now; AckMode ackMode = this.containerProperties.getAckMode(); @@ -614,6 +613,10 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } boolean countExceeded = this.count >= this.containerProperties.getAckCount(); if (ackMode.equals(AckMode.BATCH) || ackMode.equals(AckMode.COUNT) && countExceeded) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Committing in AckMode.COUNT because count " + this.count + + " exceeds configured limit of" + this.containerProperties.getAckCount()); + } commitIfNecessary(); this.count = 0; } @@ -621,11 +624,27 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener now = System.currentTimeMillis(); boolean elapsed = now - this.last > this.containerProperties.getAckTime(); if (ackMode.equals(AckMode.TIME) && elapsed) { + if (this.logger.isDebugEnabled()) { + this.logger + .debug("Committing in AckMode.TIME because time elapsed exceeds configured limit of " + + this.containerProperties.getAckTime()); + } commitIfNecessary(); this.last = now; } - else if ((ackMode.equals(AckMode.COUNT_TIME) || this.isManualAck) - && (elapsed || countExceeded)) { + else if ((ackMode.equals(AckMode.COUNT_TIME) || this.isManualAck) && (elapsed || countExceeded)) { + if (this.logger.isDebugEnabled()) { + if (elapsed) { + this.logger.debug("Committing in AckMode." + ackMode.name() + " because time elapsed " + + "exceeds configured limit of " + this.containerProperties.getAckTime()); + } + else { + this.logger.debug("Committing in AckMode." + ackMode.name() + " because count " + + this.count + " exceeds configured limit of" + + this.containerProperties.getAckCount()); + } + } + commitIfNecessary(); this.last = now; this.count = 0; @@ -674,39 +693,22 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener this.offsets.get(record.topic()).put(record.partition(), record.offset()); } - private void updateManualOffset(ConsumerRecord record) { - if (!this.manualOffsets.containsKey(record.topic())) { - this.manualOffsets.putIfAbsent(record.topic(), new ConcurrentHashMap()); - } - this.manualOffsets.get(record.topic()).put(record.partition(), record.offset()); - } - private void commitIfNecessary() { Map commits = new HashMap<>(); - if (this.isManualAck) { - for (Entry> entry : this.manualOffsets.entrySet()) { - Iterator> iterator = entry.getValue().entrySet().iterator(); - while (iterator.hasNext()) { - Entry offset = iterator.next(); - commits.put(new TopicPartition(entry.getKey(), offset.getKey()), - new OffsetAndMetadata(offset.getValue() + 1)); - iterator.remove(); - } - } - } - else { - for (Entry> entry : this.offsets.entrySet()) { - for (Entry offset : entry.getValue().entrySet()) { - commits.put(new TopicPartition(entry.getKey(), offset.getKey()), - new OffsetAndMetadata(offset.getValue() + 1)); - } + for (Entry> entry : this.offsets.entrySet()) { + for (Entry offset : entry.getValue().entrySet()) { + commits.put(new TopicPartition(entry.getKey(), offset.getKey()), + new OffsetAndMetadata(offset.getValue() + 1)); } } this.offsets.clear(); if (this.logger.isDebugEnabled()) { - this.logger.debug("Committing: " + commits); + this.logger.debug("Commit list: " + commits); } if (!commits.isEmpty()) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Committing: " + commits); + } try { if (this.containerProperties.isSyncCommits()) { this.consumer.commitSync(commits); @@ -717,6 +719,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } catch (WakeupException e) { // ignore - not polling + if (this.logger.isDebugEnabled()) { + this.logger.debug("Woken up during commit"); + } } } } @@ -793,6 +798,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } Thread.currentThread().interrupt(); } + if (ListenerConsumer.this.logger.isDebugEnabled()) { + ListenerConsumer.this.logger.debug("Invoker stopped"); + } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 30a761fc..c9b6445f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -18,9 +18,10 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.util.BitSet; @@ -207,7 +208,7 @@ public class KafkaMessageListenerContainerTests { ContainerProperties containerProps = new ContainerProperties(topic5); containerProps.setAckCount(1); containerProps.setPauseAfter(100); - containerProps.setAckMode(AckMode.MANUAL); + containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(3); @@ -232,22 +233,68 @@ public class KafkaMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic5); - template.sendDefault(0, "foo"); - template.sendDefault(2, "bar"); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 2, "bar"); template.flush(); Thread.sleep(300); - template.sendDefault(0, "fiz"); - template.sendDefault(2, "buz"); + template.sendDefault(0, 0, "fiz"); + template.sendDefault(1, 2, "buz"); template.flush(); // Verify that commitSync is called when paused assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); verify(consumer, atLeastOnce()).pause(any(TopicPartition.class), any(TopicPartition.class)); - verify(consumer, atMost(2)).commitSync(any()); + verify(consumer, atLeast(2)).commitSync(any()); verify(consumer, atLeastOnce()).resume(any(TopicPartition.class), any(TopicPartition.class)); container.stop(); } + @Test + public void testCommitsAreFlushedOnStop() throws Exception { + Map props = KafkaTestUtils.consumerProps("flushedOnStop", "false", embeddedKafka); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic5); + containerProps.setAckCount(1); + containerProps.setPauseAfter(100); + // set large values, ensuring that commits don't happen before `stop()` + containerProps.setAckTime(20000); + containerProps.setAckCount(20000); + containerProps.setAckMode(AckMode.COUNT_TIME); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + final CountDownLatch latch = new CountDownLatch(4); + containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { + logger.info("slow: " + message); + ack.acknowledge(); + latch.countDown(); + }); + container.setBeanName("testManualFlushed"); + + container.start(); + Consumer consumer = spyOnConsumer(container); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic5); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 2, "bar"); + template.flush(); + Thread.sleep(300); + template.sendDefault(0, 0, "fiz"); + template.sendDefault(1, 2, "buz"); + template.flush(); + + // Verify that commitSync is called when paused + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + // Verify that just the initial commit is processed before stop + verify(consumer, times(1)).commitSync(any()); + container.stop(); + // Verify that a commit has been made on stop + verify(consumer, times(2)).commitSync(any()); + } + @Test public void testSlowConsumerWithException() throws Exception { logger.info("Start " + this.testName.getMethodName());