GH-146: Fix TopicPartition Negative Reset

Fixes #146

When using reset relative to the current end, ensure the seek value is
not less than zero.

Add `consumer.position()` to error log message to reflect reality on `seek()`failure
This commit is contained in:
Gary Russell
2016-07-12 11:44:36 +01:00
committed by Artem Bilan
parent d04b53a7ab
commit 6d80e0e023
2 changed files with 13 additions and 7 deletions

View File

@@ -691,12 +691,18 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
if (offset < 0) {
this.consumer.seekToEnd(topicPartition);
newOffset = this.consumer.position(topicPartition) + offset;
newOffset = Math.max(0, this.consumer.position(topicPartition) + offset);
}
this.consumer.seek(topicPartition, newOffset);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Reset " + topicPartition + " to offset " + newOffset);
try {
this.consumer.seek(topicPartition, newOffset);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Reset " + topicPartition + " to offset " + newOffset);
}
}
catch (Exception e) {
logger.error("Failed to set initial offset for " + topicPartition
+ " at " + newOffset + ". Positioned to " + this.consumer.position(topicPartition), e);
}
}
}

View File

@@ -305,13 +305,13 @@ public class ConcurrentMessageListenerContainerTests {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
cf = new DefaultKafkaConsumerFactory<>(props);
// reset minus one
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1L);
// reset beginning for part 0, minus one for part 1
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1000L);
topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L);
ContainerProperties container4Props = new ContainerProperties(topic1Partition0, topic1Partition1);
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props);
resettingContainer.setBeanName("b4");
final CountDownLatch latch4 = new CountDownLatch(2);
final CountDownLatch latch4 = new CountDownLatch(3);
final AtomicReference<String> receivedMessage = new AtomicReference<>();
container4Props.setMessageListener((MessageListener<Integer, String>) message -> {
ConcurrentMessageListenerContainerTests.this.logger.info("auto part -1: " + message);