From 8aa38bd0bbdcdc61f580e1f4487b9c5f0ec93223 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 19 Mar 2025 12:13:08 -0400 Subject: [PATCH] Use `AtomicBoolean` for receive ot ignore other messages The `RabbitAmqpTemplate.receive()` uses an AMQP 1.0 `Consumer` to receive a single message. There is a race condition when we have several messages in the queue, so after receiving one, this consumer may deliver the next one. * Use `AtomicBoolean messageReceived` to accept only one message from the consumer. Since the `CompletableFuture` closes that consumer on its completion, there is just small chance that a new message would be handed to the handler. And since we do nothing with this new message, it will come back to the queue after we close consumer. --- .../amqp/rabbitmq/client/RabbitAmqpTemplate.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplate.java b/spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplate.java index 559f744b..27b2d0cf 100644 --- a/spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplate.java +++ b/spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplate.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -321,6 +322,8 @@ public class RabbitAmqpTemplate implements AsyncAmqpTemplate, DisposableBean { public CompletableFuture receive(String queueName) { CompletableFuture messageFuture = new CompletableFuture<>(); + AtomicBoolean messageReceived = new AtomicBoolean(); + Consumer consumer = this.connectionFactory.getConnection() .consumerBuilder() @@ -328,8 +331,10 @@ public class RabbitAmqpTemplate implements AsyncAmqpTemplate, DisposableBean { .initialCredits(1) .priority(10) .messageHandler((context, message) -> { - context.accept(); - messageFuture.complete(RabbitAmqpUtils.fromAmqpMessage(message, null)); + if (messageReceived.compareAndSet(false, true)) { + context.accept(); + messageFuture.complete(RabbitAmqpUtils.fromAmqpMessage(message, null)); + } }) .build();