Merge branch 'AMQP-249-backport' into 1.1.x
This commit is contained in:
@@ -22,9 +22,9 @@ import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.springframework.amqp.AmqpException;
|
||||
@@ -502,7 +502,7 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations,
|
||||
protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message) {
|
||||
Message replyMessage = this.execute(new ChannelCallback<Message>() {
|
||||
public Message doInRabbit(Channel channel) throws Exception {
|
||||
final SynchronousQueue<Message> replyHandoff = new SynchronousQueue<Message>();
|
||||
final ArrayBlockingQueue<Message> replyHandoff = new ArrayBlockingQueue<Message>(1);
|
||||
|
||||
Assert.isNull(message.getMessageProperties().getReplyTo(),
|
||||
"Send-and-receive methods can only be used if the Message does not already have a replyTo property.");
|
||||
@@ -510,7 +510,7 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations,
|
||||
String replyTo = queueDeclaration.getQueue();
|
||||
message.getMessageProperties().setReplyTo(replyTo);
|
||||
|
||||
boolean noAck = false;
|
||||
boolean noAck = true;
|
||||
String consumerTag = UUID.randomUUID().toString();
|
||||
boolean noLocal = true;
|
||||
boolean exclusive = true;
|
||||
|
||||
@@ -17,17 +17,22 @@ package org.springframework.amqp.rabbit.core;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
|
||||
import org.springframework.amqp.support.converter.SimpleMessageConverter;
|
||||
import org.springframework.amqp.utils.SerializationUtils;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
@@ -38,9 +43,13 @@ import org.springframework.transaction.support.DefaultTransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.Consumer;
|
||||
import com.rabbitmq.client.Envelope;
|
||||
import com.rabbitmq.client.impl.AMQImpl;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
@@ -135,4 +144,33 @@ public class RabbitTemplateTests {
|
||||
assertSame(input, message);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test // AMQP-249
|
||||
public void dontHangConsumerThread() throws Exception {
|
||||
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
|
||||
Connection mockConnection = mock(Connection.class);
|
||||
Channel mockChannel = mock(Channel.class);
|
||||
|
||||
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
|
||||
when(mockConnection.isOpen()).thenReturn(true);
|
||||
when(mockConnection.createChannel()).thenReturn(mockChannel);
|
||||
|
||||
when(mockChannel.queueDeclare()).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
|
||||
|
||||
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
|
||||
doAnswer(new Answer<Object>() {
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
consumer.set((Consumer) invocation.getArguments()[6]);
|
||||
return null;
|
||||
}
|
||||
}).when(mockChannel).basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyString(),
|
||||
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap(), Mockito.any(Consumer.class));
|
||||
RabbitTemplate template = new RabbitTemplate(new SingleConnectionFactory(mockConnectionFactory));
|
||||
template.setReplyTimeout(1);
|
||||
Message input = new Message("Hello, world!".getBytes(), new MessageProperties());
|
||||
template.doSendAndReceiveWithTemporary("foo", "bar", input);
|
||||
Envelope envelope = new Envelope(1, false, "foo", "bar");
|
||||
// used to hang here because of the SynchronousQueue and doSendAndReceive() already exited
|
||||
consumer.get().handleDelivery("foo", envelope, new AMQP.BasicProperties(), new byte[0]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user