diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 1dd53fa8..7bf5f877 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -691,12 +691,18 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener if (offset < 0) { this.consumer.seekToEnd(topicPartition); - newOffset = this.consumer.position(topicPartition) + offset; + newOffset = Math.max(0, this.consumer.position(topicPartition) + offset); } - this.consumer.seek(topicPartition, newOffset); - if (this.logger.isDebugEnabled()) { - this.logger.debug("Reset " + topicPartition + " to offset " + newOffset); + try { + this.consumer.seek(topicPartition, newOffset); + if (this.logger.isDebugEnabled()) { + this.logger.debug("Reset " + topicPartition + " to offset " + newOffset); + } + } + catch (Exception e) { + logger.error("Failed to set initial offset for " + topicPartition + + " at " + newOffset + ". Positioned to " + this.consumer.position(topicPartition), e); } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 0be60b6b..48707612 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -305,13 +305,13 @@ public class ConcurrentMessageListenerContainerTests { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); cf = new DefaultKafkaConsumerFactory<>(props); - // reset minus one - topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1L); + // reset beginning for part 0, minus one for part 1 + topic1Partition0 = new TopicPartitionInitialOffset(topic3, 0, -1000L); topic1Partition1 = new TopicPartitionInitialOffset(topic3, 1, -1L); ContainerProperties container4Props = new ContainerProperties(topic1Partition0, topic1Partition1); resettingContainer = new ConcurrentMessageListenerContainer<>(cf, container4Props); resettingContainer.setBeanName("b4"); - final CountDownLatch latch4 = new CountDownLatch(2); + final CountDownLatch latch4 = new CountDownLatch(3); final AtomicReference receivedMessage = new AtomicReference<>(); container4Props.setMessageListener((MessageListener) message -> { ConcurrentMessageListenerContainerTests.this.logger.info("auto part -1: " + message);