GH-146: Fix TopicPartition Negative Reset
Fixes #146 When using reset relative to the current end, ensure the seek value is not less than zero. Add `consumer.position()` to error log message to reflect reality on `seek()`failure
This commit is contained in:
committed by
Artem Bilan
parent
d04b53a7ab
commit
6d80e0e023
@@ -691,12 +691,18 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
|
||||
|
||||
if (offset < 0) {
|
||||
this.consumer.seekToEnd(topicPartition);
|
||||
newOffset = this.consumer.position(topicPartition) + offset;
|
||||
newOffset = Math.max(0, this.consumer.position(topicPartition) + offset);
|
||||
}
|
||||
|
||||
this.consumer.seek(topicPartition, newOffset);
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Reset " + topicPartition + " to offset " + newOffset);
|
||||
try {
|
||||
this.consumer.seek(topicPartition, newOffset);
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug("Reset " + topicPartition + " to offset " + newOffset);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Failed to set initial offset for " + topicPartition
|
||||
+ " at " + newOffset + ". Positioned to " + this.consumer.position(topicPartition), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -305,13 +305,13 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
cf = new DefaultKafkaConsumerFactory<>(props);
|
||||
// reset minus one
|
||||
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1L);
|
||||
// reset beginning for part 0, minus one for part 1
|
||||
topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1000L);
|
||||
topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L);
|
||||
ContainerProperties container4Props = new ContainerProperties(topic1Partition0, topic1Partition1);
|
||||
resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props);
|
||||
resettingContainer.setBeanName("b4");
|
||||
final CountDownLatch latch4 = new CountDownLatch(2);
|
||||
final CountDownLatch latch4 = new CountDownLatch(3);
|
||||
final AtomicReference<String> receivedMessage = new AtomicReference<>();
|
||||
container4Props.setMessageListener((MessageListener<Integer, String>) message -> {
|
||||
ConcurrentMessageListenerContainerTests.this.logger.info("auto part -1: " + message);
|
||||
|
||||
Reference in New Issue
Block a user