AMQP-249 Fix Hanging Thread/Unacked Messages

Previously, the temporary reply channel used for send and
receive operations (when not using a reply-queue) were configured
for acks, but no ack was sent. The queue is temporary so was
removed when the consumer was cancelled but, with cached channels,
the admin UI showed an unacked count for the channel.

In addition, there was a race condition that could cause a hung
thread. The handover to the calling thread was done using a
SynchronousQueue; if the caller timed out just as the reply
arrived, the consumer thread could hang on the put.

Changed the temporary reply queue declaration to use auto-ack
and changed the SynchronousQueue to an ArrayBlockingQueue.
This commit is contained in:
Gary Russell
2012-07-12 17:11:40 -04:00
parent 0261937f08
commit a5d84a820f
2 changed files with 41 additions and 3 deletions

View File

@@ -22,9 +22,9 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.AmqpException;
@@ -502,7 +502,7 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations,
protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message) {
Message replyMessage = this.execute(new ChannelCallback<Message>() {
public Message doInRabbit(Channel channel) throws Exception {
final SynchronousQueue<Message> replyHandoff = new SynchronousQueue<Message>();
final ArrayBlockingQueue<Message> replyHandoff = new ArrayBlockingQueue<Message>(1);
Assert.isNull(message.getMessageProperties().getReplyTo(),
"Send-and-receive methods can only be used if the Message does not already have a replyTo property.");
@@ -510,7 +510,7 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations,
String replyTo = queueDeclaration.getQueue();
message.getMessageProperties().setReplyTo(replyTo);
boolean noAck = false;
boolean noAck = true;
String consumerTag = UUID.randomUUID().toString();
boolean noLocal = true;
boolean exclusive = true;

View File

@@ -17,17 +17,22 @@ package org.springframework.amqp.rabbit.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.transaction.TransactionDefinition;
@@ -38,9 +43,13 @@ import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.AMQImpl;
/**
* @author Gary Russell
@@ -135,4 +144,33 @@ public class RabbitTemplateTests {
assertSame(input, message);
}
@SuppressWarnings("unchecked")
@Test // AMQP-249
public void dontHangConsumerThread() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
when(mockConnection.createChannel()).thenReturn(mockChannel);
when(mockChannel.queueDeclare()).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[6]);
return null;
}
}).when(mockChannel).basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyString(),
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap(), Mockito.any(Consumer.class));
RabbitTemplate template = new RabbitTemplate(new SingleConnectionFactory(mockConnectionFactory));
template.setReplyTimeout(1);
Message input = new Message("Hello, world!".getBytes(), new MessageProperties());
template.doSendAndReceiveWithTemporary("foo", "bar", input);
Envelope envelope = new Envelope(1, false, "foo", "bar");
// used to hang here because of the SynchronousQueue and doSendAndReceive() already exited
consumer.get().handleDelivery("foo", envelope, new AMQP.BasicProperties(), new byte[0]);
}
}