GH-3079: Allow ACKs within the ShutdownTimeout period
Fixes: #3079
Issue link: https://github.com/spring-projects/spring-amqp/issues/3079
Remove `cancelled()` condition from the `BlockingQueueConsumer.commitIfNecessary()` to allow initiated acks to be committed.
The cancelled consumer means no new deliveries, but that does not mean we cannot ack outstanding messages.
Actually, this is a leftover after previous fix with assumption that messages have to be returned to the queue with `Recovery` command when consumer is cancelled.
Signed-off-by: Marco Cozzi <marco@codeatlas.it>
(cherry picked from commit c3cc65dad9)
This commit is contained in:
committed by
Spring Builds
parent
f4db40298e
commit
5d99834227
@@ -884,7 +884,7 @@ public class BlockingQueueConsumer {
|
||||
|
||||
/**
|
||||
* Perform a commit or message acknowledgement, as appropriate.
|
||||
* NOTE: This method was never been intended tobe public.
|
||||
* NOTE: This method was never intended to be public.
|
||||
* @param localTx Whether the channel is locally transacted.
|
||||
* @return true if at least one delivery tag exists.
|
||||
* @deprecated in favor of {@link #commitIfNecessary(boolean, boolean)}
|
||||
@@ -917,7 +917,7 @@ public class BlockingQueueConsumer {
|
||||
try {
|
||||
boolean ackRequired = forceAck || (!this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual());
|
||||
|
||||
if (ackRequired && (!this.transactional || (isLocallyTransacted && !cancelled()))) {
|
||||
if (ackRequired && (!this.transactional || isLocallyTransacted)) {
|
||||
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
|
||||
deliveryTag.ifPresent((tag) -> {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user