Use AtomicBoolean for receive ot ignore other messages
The `RabbitAmqpTemplate.receive()` uses an AMQP 1.0 `Consumer` to receive a single message. There is a race condition when we have several messages in the queue, so after receiving one, this consumer may deliver the next one. * Use `AtomicBoolean messageReceived` to accept only one message from the consumer. Since the `CompletableFuture` closes that consumer on its completion, there is just small chance that a new message would be handed to the handler. And since we do nothing with this new message, it will come back to the queue after we close consumer.
This commit is contained in:
@@ -20,6 +20,7 @@ import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Supplier;
|
||||
@@ -321,6 +322,8 @@ public class RabbitAmqpTemplate implements AsyncAmqpTemplate, DisposableBean {
|
||||
public CompletableFuture<Message> receive(String queueName) {
|
||||
CompletableFuture<Message> messageFuture = new CompletableFuture<>();
|
||||
|
||||
AtomicBoolean messageReceived = new AtomicBoolean();
|
||||
|
||||
Consumer consumer =
|
||||
this.connectionFactory.getConnection()
|
||||
.consumerBuilder()
|
||||
@@ -328,8 +331,10 @@ public class RabbitAmqpTemplate implements AsyncAmqpTemplate, DisposableBean {
|
||||
.initialCredits(1)
|
||||
.priority(10)
|
||||
.messageHandler((context, message) -> {
|
||||
context.accept();
|
||||
messageFuture.complete(RabbitAmqpUtils.fromAmqpMessage(message, null));
|
||||
if (messageReceived.compareAndSet(false, true)) {
|
||||
context.accept();
|
||||
messageFuture.complete(RabbitAmqpUtils.fromAmqpMessage(message, null));
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user