GH-118: Rework Manual AckMode

Fixes #118

- Remove MANUAL_IMMEDIATE_SYNC - sync/async is controlled by the `syncCommits` property
- For MANUAL, wake the consumer thread at the end of the batch
- For MANUAL_IMMEDIATE, the consumer thread is woken directly from the Acknowledgement

Polishing

* Some simple code reformatting for `KafkaMessageListenerContainer`
* Move the `CountDownLatch` logic in the `KafkaMessageListenerContainerTests#testSlowConsumerCommitsAreProcessed()`
into the mock for `Consumer.commitSync()` since the count logic in the listener immediately after `ack.acknowledge()` doesn't guaranty that
`Consumer.commitSync()`will be called. That is because listener lives in one Thread, but `processCommits()` is done from a different `ListenerConsumer` Thread
This commit is contained in:
Gary Russell
2016-06-30 11:53:26 -04:00
committed by Artem Bilan
parent 3483387eb0
commit 51af726693
5 changed files with 76 additions and 60 deletions

View File

@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
@@ -84,21 +83,18 @@ public abstract class AbstractMessageListenerContainer<K, V>
COUNT_TIME,
/**
* Same as {@link #COUNT_TIME} except for pending manual acks.
* If no count or time are set, works as {@link #MANUAL_IMMEDIATE_SYNC}.
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}.
*/
MANUAL,
/**
* Call {@link Consumer#commitAsync()} immediately for pending acks.
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer is woken to
* immediately process the commit.
*/
MANUAL_IMMEDIATE,
/**
* Call {@link Consumer#commitSync()} immediately for pending acks.
*/
MANUAL_IMMEDIATE_SYNC
}
private final ContainerProperties containerProperties;

View File

@@ -241,13 +241,14 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private final boolean isManualAck = this.containerProperties.getAckMode().equals(AckMode.MANUAL);
private final boolean isManualImmediateAck =
this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE)
|| this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE_SYNC);
this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE);
private final boolean isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
private final boolean isRecordAck = this.containerProperties.getAckMode().equals(AckMode.RECORD);
private final boolean isBatchAck = this.containerProperties.getAckMode().equals(AckMode.BATCH);
private final BlockingQueue<ConsumerRecords<K, V>> recordsToProcess =
new LinkedBlockingQueue<>(this.containerProperties.getQueueDepth());
@@ -289,8 +290,8 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
// this will occur on the initial start on a subscription
if (!ListenerConsumer.this.autoCommit) {
if (ListenerConsumer.this.logger.isTraceEnabled()) {
ListenerConsumer.this.logger
.trace("Received partition revocation notification, and will stop the invoker.");
ListenerConsumer.this.logger.trace("Received partition revocation notification, " +
"and will stop the invoker.");
}
if (ListenerConsumer.this.listenerInvokerFuture != null) {
stopInvokerAndCommitManualAcks();
@@ -299,16 +300,16 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
else {
if (!CollectionUtils.isEmpty(partitions)) {
ListenerConsumer.this.logger.error(
"Invalid state: the invoker was not active, but the consumer had allocated partitions");
ListenerConsumer.this.logger.error("Invalid state: the invoker was not active, " +
"but the consumer had allocated partitions");
}
}
}
else {
if (ListenerConsumer.this.logger.isTraceEnabled()) {
ListenerConsumer.this.logger
.trace("Received partition revocation notification, but the container is in "
+ "autocommit mode, so transition will be handled by the consumer");
ListenerConsumer.this.logger.trace("Received partition revocation notification, " +
"but the container is in autocommit mode, " +
"so transition will be handled by the consumer");
}
}
getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions);
@@ -573,13 +574,13 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Committing: " + commits);
}
if (ListenerConsumer.this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
if (this.containerProperties.isSyncCommits()) {
ListenerConsumer.this.consumer.commitSync(commits);
}
else {
ListenerConsumer.this.consumer.commitAsync(commits,
ListenerConsumer.this.commitCallback);
}
else { // MANUAL_IMMEDIATE_SYNC
ListenerConsumer.this.consumer.commitSync(commits);
}
}
private void invokeListener(final ConsumerRecords<K, V> records) {
@@ -591,13 +592,14 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
try {
if (this.acknowledgingMessageListener != null) {
this.acknowledgingMessageListener.onMessage(record, new ConsumerAcknowledgment(record));
this.acknowledgingMessageListener.onMessage(record,
new ConsumerAcknowledgment(record, this.isManualImmediateAck));
}
else {
this.listener.onMessage(record);
}
this.acks.add(record);
if (this.isManualImmediateAck || this.isRecordAck) {
if (this.isRecordAck) {
this.consumer.wakeup();
}
}
@@ -613,6 +615,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
}
}
if (this.isManualAck || this.isBatchAck) {
this.consumer.wakeup();
}
}
private void processCommits() {
@@ -625,8 +630,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
updatePendingOffsets();
}
boolean countExceeded = this.count >= this.containerProperties.getAckCount();
if (ackMode.equals(AckMode.BATCH) || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
if (this.logger.isDebugEnabled()) {
if (this.isManualAck || this.isBatchAck
|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {
if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
this.logger.debug("Committing in AckMode.COUNT because count " + this.count
+ " exceeds configured limit of " + this.containerProperties.getAckCount());
}
@@ -638,25 +644,24 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
boolean elapsed = now - this.last > this.containerProperties.getAckTime();
if (ackMode.equals(AckMode.TIME) && elapsed) {
if (this.logger.isDebugEnabled()) {
this.logger
.debug("Committing in AckMode.TIME " +
this.logger.debug("Committing in AckMode.TIME " +
"because time elapsed exceeds configured limit of " +
this.containerProperties.getAckTime());
}
commitIfNecessary();
this.last = now;
}
else if ((ackMode.equals(AckMode.COUNT_TIME) || this.isManualAck) && (elapsed || countExceeded)) {
else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
if (this.logger.isDebugEnabled()) {
if (elapsed) {
this.logger.debug("Committing in AckMode." + ackMode.name() +
" because time elapsed exceeds configured limit of " +
this.logger.debug("Committing in AckMode.COUNT_TIME " +
"because time elapsed exceeds configured limit of " +
this.containerProperties.getAckTime());
}
else {
this.logger.debug("Committing in AckMode." + ackMode.name() + " because count "
+ this.count + " exceeds configured limit of"
+ this.containerProperties.getAckCount());
this.logger.debug("Committing in AckMode.COUNT_TIME " +
"because count " + this.count + " exceeds configured limit of" +
this.containerProperties.getAckCount());
}
}
@@ -823,8 +828,11 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private final ConsumerRecord<K, V> record;
private ConsumerAcknowledgment(ConsumerRecord<K, V> record) {
private final boolean immediate;
private ConsumerAcknowledgment(ConsumerRecord<K, V> record, boolean immediate) {
this.record = record;
this.immediate = immediate;
}
@Override
@@ -836,7 +844,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted while queuing ack for " + this.record, e);
}
ListenerConsumer.this.consumer.wakeup();
if (this.immediate) {
ListenerConsumer.this.consumer.wakeup();
}
}
@Override

View File

@@ -352,10 +352,10 @@ public class ConcurrentMessageListenerContainerTests {
@Test
public void testManualCommit() throws Exception {
testManualCommitGuts(AckMode.MANUAL, topic4);
testManualCommitGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic5);
testManualCommitGuts(AckMode.MANUAL_IMMEDIATE, topic5);
// to be sure the commits worked ok so run the tests again and the second tests start at the committed offset.
testManualCommitGuts(AckMode.MANUAL, topic4);
testManualCommitGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic5);
testManualCommitGuts(AckMode.MANUAL_IMMEDIATE, topic5);
}
private void testManualCommitGuts(AckMode ackMode, String topic) throws Exception {
@@ -442,7 +442,7 @@ public class ConcurrentMessageListenerContainerTests {
@Test
public void testManualCommitSyncExisting() throws Exception {
this.logger.info("Start MANUAL_IMMEDIATE_SYNC with Existing");
this.logger.info("Start MANUAL_IMMEDIATE with Existing");
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
@@ -456,6 +456,7 @@ public class ConcurrentMessageListenerContainerTests {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
ContainerProperties containerProps = new ContainerProperties(topic8);
containerProps.setSyncCommits(true);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(8);
@@ -467,7 +468,7 @@ public class ConcurrentMessageListenerContainerTests {
latch.countDown();
});
container.setConcurrency(1);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
container.setBeanName("testManualExisting");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
@@ -479,7 +480,7 @@ public class ConcurrentMessageListenerContainerTests {
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(bitSet.cardinality()).isEqualTo(8);
container.stop();
this.logger.info("Stop MANUAL_IMMEDIATE_SYNC with Existing");
this.logger.info("Stop MANUAL_IMMEDIATE with Existing");
}
@Test
@@ -513,7 +514,6 @@ public class ConcurrentMessageListenerContainerTests {
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
container.setConcurrency(3);
container.start();
@SuppressWarnings("unchecked")
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
"containers", List.class);
assertThat(containers.size()).isEqualTo(3);

View File

@@ -17,8 +17,8 @@
package org.springframework.kafka.listener;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -141,9 +141,9 @@ public class KafkaMessageListenerContainerTests {
@Test
public void testSlowListenerManualCommit() throws Exception {
testSlowListenerManualGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic2);
testSlowListenerManualGuts(AckMode.MANUAL_IMMEDIATE, topic2);
// to be sure the commits worked ok so run the tests again and the second tests start at the committed offset.
testSlowListenerManualGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic2);
testSlowListenerManualGuts(AckMode.MANUAL_IMMEDIATE, topic2);
}
private void testSlowListenerManualGuts(AckMode ackMode, String topic) throws Exception {
@@ -151,6 +151,7 @@ public class KafkaMessageListenerContainerTests {
Map<String, Object> props = KafkaTestUtils.consumerProps("slow2", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
ContainerProperties containerProps = new ContainerProperties(topic);
containerProps.setSyncCommits(true);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(6);
@@ -208,10 +209,11 @@ public class KafkaMessageListenerContainerTests {
ContainerProperties containerProps = new ContainerProperties(topic5);
containerProps.setAckCount(1);
containerProps.setPauseAfter(100);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC);
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProps.setSyncCommits(true);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch latch = new CountDownLatch(3);
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
logger.info("slow: " + message);
try {
@@ -221,12 +223,22 @@ public class KafkaMessageListenerContainerTests {
Thread.currentThread().interrupt();
}
ack.acknowledge();
latch.countDown();
});
container.setBeanName("testSlow");
container.start();
Consumer<?, ?> consumer = spyOnConsumer(container);
final CountDownLatch latch = new CountDownLatch(3);
willAnswer(invocation -> {
latch.countDown();
return invocation.callRealMethod();
}).given(consumer)
.commitSync(any());
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
@@ -244,7 +256,6 @@ public class KafkaMessageListenerContainerTests {
// Verify that commitSync is called when paused
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
verify(consumer, atLeastOnce()).pause(any(TopicPartition.class), any(TopicPartition.class));
verify(consumer, atLeast(2)).commitSync(any());
verify(consumer, atLeastOnce()).resume(any(TopicPartition.class), any(TopicPartition.class));
container.stop();
}

View File

@@ -209,22 +209,21 @@ If it is false, the containers support the following `AckMode` s.
The consumer `poll()` method will return one or more `ConsumerRecords`; the `MessageListener` is called for each record;
the following describes the action taken by the container for each `AckMode` :
- RECORD - call `commitAsync()` when the listener returns after processing the record.
- BATCH - call `commitAsync()` when all the records returned by the `poll()` have been processed.
- TIME - call `commitAsync()` when all the records returned by the `poll()` have been processed as long as the `ackTime`
- RECORD - commit the offset when the listener returns after processing the record.
- BATCH - commit the offset when all the records returned by the `poll()` have been processed.
- TIME - commit the offset when all the records returned by the `poll()` have been processed as long as the `ackTime`
since the last commit has been exceeded.
- COUNT - call `commitAsync()` when all the records returned by the `poll()` have been processed as long as `ackCount`
- COUNT - commit the offset when all the records returned by the `poll()` have been processed as long as `ackCount`
records have been received since the last commit.
- COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
- MANUAL - the message listener (`AcknowledgingMessageListener`) is responsible to `acknowledge()` the `Acknowledgment`;
after which, the same semantics as `COUNT_TIME` are applied.
- MANUAL_IMMEDIATE - call `commitAsync()` immediately when the `Acknowledgment.acknowledge()` method is called by the
listener - must be executed on the container's thread.
- MANUAL_IMMEDIATE_SYNC - call `commitSync()` immediately when the `Acknowledgment.acknowledge()` method is called by
the listener - must be executed on the container's thread.
after which, the same semantics as `BATCH` are applied.
- MANUAL_IMMEDIATE - commit the offset immediately when the `Acknowledgment.acknowledge()` method is called by the
listener.
NOTE: `MANUAL`, `MANUAL_IMMEDIATE`, and `MANUAL_IMMEDIATE_SYNC` require the listener to be an
`AcknowledgingMessageListener`.
NOTE: `MANUAL`, and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener`.
The `commitSync()` or `commitAsync()` method on the consumer is used, depending on the `syncCommits` container property.
[source, java]
----