diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index b13ac5a6..171e7c73 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; @@ -84,21 +83,18 @@ public abstract class AbstractMessageListenerContainer COUNT_TIME, /** - * Same as {@link #COUNT_TIME} except for pending manual acks. - * If no count or time are set, works as {@link #MANUAL_IMMEDIATE_SYNC}. + * User takes responsibility for acks using an + * {@link AcknowledgingMessageListener}. */ MANUAL, /** - * Call {@link Consumer#commitAsync()} immediately for pending acks. + * User takes responsibility for acks using an + * {@link AcknowledgingMessageListener}. The consumer is woken to + * immediately process the commit. */ MANUAL_IMMEDIATE, - /** - * Call {@link Consumer#commitSync()} immediately for pending acks. - */ - MANUAL_IMMEDIATE_SYNC - } private final ContainerProperties containerProperties; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6d2bfffd..98515b62 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -241,13 +241,14 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener private final boolean isManualAck = this.containerProperties.getAckMode().equals(AckMode.MANUAL); private final boolean isManualImmediateAck = - this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE) - || this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE_SYNC); + this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE); private final boolean isAnyManualAck = this.isManualAck || this.isManualImmediateAck; private final boolean isRecordAck = this.containerProperties.getAckMode().equals(AckMode.RECORD); + private final boolean isBatchAck = this.containerProperties.getAckMode().equals(AckMode.BATCH); + private final BlockingQueue> recordsToProcess = new LinkedBlockingQueue<>(this.containerProperties.getQueueDepth()); @@ -289,8 +290,8 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener // this will occur on the initial start on a subscription if (!ListenerConsumer.this.autoCommit) { if (ListenerConsumer.this.logger.isTraceEnabled()) { - ListenerConsumer.this.logger - .trace("Received partition revocation notification, and will stop the invoker."); + ListenerConsumer.this.logger.trace("Received partition revocation notification, " + + "and will stop the invoker."); } if (ListenerConsumer.this.listenerInvokerFuture != null) { stopInvokerAndCommitManualAcks(); @@ -299,16 +300,16 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } else { if (!CollectionUtils.isEmpty(partitions)) { - ListenerConsumer.this.logger.error( - "Invalid state: the invoker was not active, but the consumer had allocated partitions"); + ListenerConsumer.this.logger.error("Invalid state: the invoker was not active, " + + "but the consumer had allocated partitions"); } } } else { if (ListenerConsumer.this.logger.isTraceEnabled()) { - ListenerConsumer.this.logger - .trace("Received partition revocation notification, but the container is in " - + "autocommit mode, so transition will be handled by the consumer"); + ListenerConsumer.this.logger.trace("Received partition revocation notification, " + + "but the container is in autocommit mode, " + + "so transition will be handled by the consumer"); } } getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions); @@ -573,13 +574,13 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Committing: " + commits); } - if (ListenerConsumer.this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) { + if (this.containerProperties.isSyncCommits()) { + ListenerConsumer.this.consumer.commitSync(commits); + } + else { ListenerConsumer.this.consumer.commitAsync(commits, ListenerConsumer.this.commitCallback); } - else { // MANUAL_IMMEDIATE_SYNC - ListenerConsumer.this.consumer.commitSync(commits); - } } private void invokeListener(final ConsumerRecords records) { @@ -591,13 +592,14 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } try { if (this.acknowledgingMessageListener != null) { - this.acknowledgingMessageListener.onMessage(record, new ConsumerAcknowledgment(record)); + this.acknowledgingMessageListener.onMessage(record, + new ConsumerAcknowledgment(record, this.isManualImmediateAck)); } else { this.listener.onMessage(record); } this.acks.add(record); - if (this.isManualImmediateAck || this.isRecordAck) { + if (this.isRecordAck) { this.consumer.wakeup(); } } @@ -613,6 +615,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } } } + if (this.isManualAck || this.isBatchAck) { + this.consumer.wakeup(); + } } private void processCommits() { @@ -625,8 +630,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener updatePendingOffsets(); } boolean countExceeded = this.count >= this.containerProperties.getAckCount(); - if (ackMode.equals(AckMode.BATCH) || (ackMode.equals(AckMode.COUNT) && countExceeded)) { - if (this.logger.isDebugEnabled()) { + if (this.isManualAck || this.isBatchAck + || (ackMode.equals(AckMode.COUNT) && countExceeded)) { + if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) { this.logger.debug("Committing in AckMode.COUNT because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount()); } @@ -638,25 +644,24 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener boolean elapsed = now - this.last > this.containerProperties.getAckTime(); if (ackMode.equals(AckMode.TIME) && elapsed) { if (this.logger.isDebugEnabled()) { - this.logger - .debug("Committing in AckMode.TIME " + + this.logger.debug("Committing in AckMode.TIME " + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); } commitIfNecessary(); this.last = now; } - else if ((ackMode.equals(AckMode.COUNT_TIME) || this.isManualAck) && (elapsed || countExceeded)) { + else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) { if (this.logger.isDebugEnabled()) { if (elapsed) { - this.logger.debug("Committing in AckMode." + ackMode.name() + - " because time elapsed exceeds configured limit of " + + this.logger.debug("Committing in AckMode.COUNT_TIME " + + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); } else { - this.logger.debug("Committing in AckMode." + ackMode.name() + " because count " - + this.count + " exceeds configured limit of" - + this.containerProperties.getAckCount()); + this.logger.debug("Committing in AckMode.COUNT_TIME " + + "because count " + this.count + " exceeds configured limit of" + + this.containerProperties.getAckCount()); } } @@ -823,8 +828,11 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener private final ConsumerRecord record; - private ConsumerAcknowledgment(ConsumerRecord record) { + private final boolean immediate; + + private ConsumerAcknowledgment(ConsumerRecord record, boolean immediate) { this.record = record; + this.immediate = immediate; } @Override @@ -836,7 +844,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener Thread.currentThread().interrupt(); throw new KafkaException("Interrupted while queuing ack for " + this.record, e); } - ListenerConsumer.this.consumer.wakeup(); + if (this.immediate) { + ListenerConsumer.this.consumer.wakeup(); + } } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 43f870f6..347e89eb 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -352,10 +352,10 @@ public class ConcurrentMessageListenerContainerTests { @Test public void testManualCommit() throws Exception { testManualCommitGuts(AckMode.MANUAL, topic4); - testManualCommitGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic5); + testManualCommitGuts(AckMode.MANUAL_IMMEDIATE, topic5); // to be sure the commits worked ok so run the tests again and the second tests start at the committed offset. testManualCommitGuts(AckMode.MANUAL, topic4); - testManualCommitGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic5); + testManualCommitGuts(AckMode.MANUAL_IMMEDIATE, topic5); } private void testManualCommitGuts(AckMode ackMode, String topic) throws Exception { @@ -442,7 +442,7 @@ public class ConcurrentMessageListenerContainerTests { @Test public void testManualCommitSyncExisting() throws Exception { - this.logger.info("Start MANUAL_IMMEDIATE_SYNC with Existing"); + this.logger.info("Start MANUAL_IMMEDIATE with Existing"); Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); @@ -456,6 +456,7 @@ public class ConcurrentMessageListenerContainerTests { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic8); + containerProps.setSyncCommits(true); ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(8); @@ -467,7 +468,7 @@ public class ConcurrentMessageListenerContainerTests { latch.countDown(); }); container.setConcurrency(1); - containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC); + containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE); container.setBeanName("testManualExisting"); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); @@ -479,7 +480,7 @@ public class ConcurrentMessageListenerContainerTests { assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(bitSet.cardinality()).isEqualTo(8); container.stop(); - this.logger.info("Stop MANUAL_IMMEDIATE_SYNC with Existing"); + this.logger.info("Stop MANUAL_IMMEDIATE with Existing"); } @Test @@ -513,7 +514,6 @@ public class ConcurrentMessageListenerContainerTests { containerProps.setMessageListener((MessageListener) message -> { }); container.setConcurrency(3); container.start(); - @SuppressWarnings("unchecked") List> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class); assertThat(containers.size()).isEqualTo(3); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index c9b6445f..55959d89 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -17,8 +17,8 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -141,9 +141,9 @@ public class KafkaMessageListenerContainerTests { @Test public void testSlowListenerManualCommit() throws Exception { - testSlowListenerManualGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic2); + testSlowListenerManualGuts(AckMode.MANUAL_IMMEDIATE, topic2); // to be sure the commits worked ok so run the tests again and the second tests start at the committed offset. - testSlowListenerManualGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic2); + testSlowListenerManualGuts(AckMode.MANUAL_IMMEDIATE, topic2); } private void testSlowListenerManualGuts(AckMode ackMode, String topic) throws Exception { @@ -151,6 +151,7 @@ public class KafkaMessageListenerContainerTests { Map props = KafkaTestUtils.consumerProps("slow2", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props); ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setSyncCommits(true); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); final CountDownLatch latch = new CountDownLatch(6); @@ -208,10 +209,11 @@ public class KafkaMessageListenerContainerTests { ContainerProperties containerProps = new ContainerProperties(topic5); containerProps.setAckCount(1); containerProps.setPauseAfter(100); - containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC); + containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE); + containerProps.setSyncCommits(true); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); - final CountDownLatch latch = new CountDownLatch(3); + containerProps.setMessageListener((AcknowledgingMessageListener) (message, ack) -> { logger.info("slow: " + message); try { @@ -221,12 +223,22 @@ public class KafkaMessageListenerContainerTests { Thread.currentThread().interrupt(); } ack.acknowledge(); - latch.countDown(); }); container.setBeanName("testSlow"); container.start(); Consumer consumer = spyOnConsumer(container); + + final CountDownLatch latch = new CountDownLatch(3); + + willAnswer(invocation -> { + + latch.countDown(); + return invocation.callRealMethod(); + + }).given(consumer) + .commitSync(any()); + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); @@ -244,7 +256,6 @@ public class KafkaMessageListenerContainerTests { // Verify that commitSync is called when paused assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); verify(consumer, atLeastOnce()).pause(any(TopicPartition.class), any(TopicPartition.class)); - verify(consumer, atLeast(2)).commitSync(any()); verify(consumer, atLeastOnce()).resume(any(TopicPartition.class), any(TopicPartition.class)); container.stop(); } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index f47f645c..93179ffd 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -209,22 +209,21 @@ If it is false, the containers support the following `AckMode` s. The consumer `poll()` method will return one or more `ConsumerRecords`; the `MessageListener` is called for each record; the following describes the action taken by the container for each `AckMode` : -- RECORD - call `commitAsync()` when the listener returns after processing the record. -- BATCH - call `commitAsync()` when all the records returned by the `poll()` have been processed. -- TIME - call `commitAsync()` when all the records returned by the `poll()` have been processed as long as the `ackTime` +- RECORD - commit the offset when the listener returns after processing the record. +- BATCH - commit the offset when all the records returned by the `poll()` have been processed. +- TIME - commit the offset when all the records returned by the `poll()` have been processed as long as the `ackTime` since the last commit has been exceeded. -- COUNT - call `commitAsync()` when all the records returned by the `poll()` have been processed as long as `ackCount` +- COUNT - commit the offset when all the records returned by the `poll()` have been processed as long as `ackCount` records have been received since the last commit. - COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true. - MANUAL - the message listener (`AcknowledgingMessageListener`) is responsible to `acknowledge()` the `Acknowledgment`; -after which, the same semantics as `COUNT_TIME` are applied. -- MANUAL_IMMEDIATE - call `commitAsync()` immediately when the `Acknowledgment.acknowledge()` method is called by the -listener - must be executed on the container's thread. -- MANUAL_IMMEDIATE_SYNC - call `commitSync()` immediately when the `Acknowledgment.acknowledge()` method is called by -the listener - must be executed on the container's thread. +after which, the same semantics as `BATCH` are applied. +- MANUAL_IMMEDIATE - commit the offset immediately when the `Acknowledgment.acknowledge()` method is called by the +listener. -NOTE: `MANUAL`, `MANUAL_IMMEDIATE`, and `MANUAL_IMMEDIATE_SYNC` require the listener to be an -`AcknowledgingMessageListener`. +NOTE: `MANUAL`, and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener`. + +The `commitSync()` or `commitAsync()` method on the consumer is used, depending on the `syncCommits` container property. [source, java] ----