GH-3039: Move Recovery in BlockingQueueConsumer into stop()

Fixes: #3039
Issue link: https://github.com/spring-projects/spring-amqp/issues/3039

Currently, the `BlockingQueueConsumer` initiates a Basic Recovery command on the
channel for transactional consumer immediately after Basic Cancel.
However, it is possible still to try to handle in-flight messages during `shutdownTimeout`
in the listener container

* Leave only Basic Cancel command in the `BlockingQueueConsumer.basicCancel()` API
* Revert `BlockingQueueConsumer.nextMessage(timeout)` method logic to normal loop
until message pulled from the in-memory cache is `null`
* Call `basicCancel(true)` from the `stop()` is not cancelled yet
* Perform `channel.basicRecover()` for transactional channel in the `stop()`.
This `stop()` is usually called from the listener container when in-flight messages
have not been processed during `shutdownTimeout`

**Auto-cherry-pick to `3.2.x` & `3.1.x`**
This commit is contained in:
Artem Bilan
2025-04-11 11:02:30 -04:00
parent 1dd3f4d078
commit 14fe215734

View File

@@ -464,18 +464,19 @@ public class BlockingQueueConsumer {
}
protected void basicCancel() {
basicCancel(false);
basicCancel(true);
}
protected void basicCancel(boolean expected) {
this.normalCancel = expected;
getConsumerTags()
.forEach(consumerTag -> {
if (this.channel.isOpen()) {
RabbitUtils.cancel(this.channel, consumerTag);
}
});
this.cancelled.set(true);
this.abortStarted = System.currentTimeMillis();
Collection<String> consumerTags = getConsumerTags();
if (!CollectionUtils.isEmpty(consumerTags)) {
RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional);
}
}
protected boolean hasDelivery() {
@@ -559,35 +560,12 @@ public class BlockingQueueConsumer {
if (!this.missingQueues.isEmpty()) {
checkMissingQueues();
}
if (this.transactional && cancelled()) {
throw consumerCancelledException(null);
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get()) {
this.activeObjectCounter.release(this);
throw new ConsumerCancelledException();
}
else {
Message message = handle(timeout < 0 ? this.queue.take() : this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (cancelled() && (message == null || this.transactional)) {
Long deliveryTagToNack = null;
if (message != null) {
deliveryTagToNack = message.getMessageProperties().getDeliveryTag();
}
throw consumerCancelledException(deliveryTagToNack);
}
else {
return message;
}
}
}
private ConsumerCancelledException consumerCancelledException(@Nullable Long deliveryTagToNack) {
this.activeObjectCounter.release(this);
ConsumerCancelledException consumerCancelledException = new ConsumerCancelledException();
if (deliveryTagToNack != null) {
rollbackOnExceptionIfNecessary(consumerCancelledException, deliveryTagToNack);
}
else {
this.deliveryTags.clear();
}
return consumerCancelledException;
return message;
}
/*
@@ -815,13 +793,21 @@ public class BlockingQueueConsumer {
this.abortStarted = System.currentTimeMillis();
}
if (!cancelled()) {
try {
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
basicCancel(true);
}
try {
if (this.transactional) {
/*
* Re-queue in-flight messages if any
* (after the consumer is cancelled to prevent the broker from simply sending them back to us).
* Does not require a tx.commit.
*/
this.channel.basicRecover(true);
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error closing consumer " + this, e);
}
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error closing consumer " + this, e);
}
}
if (logger.isDebugEnabled()) {