diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index 7fef25ac..0d2d8817 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -22,9 +22,9 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import org.springframework.amqp.AmqpException; @@ -502,7 +502,7 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations, protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message) { Message replyMessage = this.execute(new ChannelCallback() { public Message doInRabbit(Channel channel) throws Exception { - final SynchronousQueue replyHandoff = new SynchronousQueue(); + final ArrayBlockingQueue replyHandoff = new ArrayBlockingQueue(1); Assert.isNull(message.getMessageProperties().getReplyTo(), "Send-and-receive methods can only be used if the Message does not already have a replyTo property."); @@ -510,7 +510,7 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations, String replyTo = queueDeclaration.getQueue(); message.getMessageProperties().setReplyTo(replyTo); - boolean noAck = false; + boolean noAck = true; String consumerTag = UUID.randomUUID().toString(); boolean noLocal = true; boolean exclusive = true; diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java index 3195ef36..f67a0aa7 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java @@ -17,17 +17,22 @@ package org.springframework.amqp.rabbit.core; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.amqp.utils.SerializationUtils; import org.springframework.transaction.TransactionDefinition; @@ -38,9 +43,13 @@ import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.impl.AMQImpl; /** * @author Gary Russell @@ -135,4 +144,33 @@ public class RabbitTemplateTests { assertSame(input, message); } + @SuppressWarnings("unchecked") + @Test // AMQP-249 + public void dontHangConsumerThread() throws Exception { + ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class); + Connection mockConnection = mock(Connection.class); + Channel mockChannel = mock(Channel.class); + + when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); + when(mockConnection.isOpen()).thenReturn(true); + when(mockConnection.createChannel()).thenReturn(mockChannel); + + when(mockChannel.queueDeclare()).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0)); + + final AtomicReference consumer = new AtomicReference(); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Throwable { + consumer.set((Consumer) invocation.getArguments()[6]); + return null; + } + }).when(mockChannel).basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyString(), + Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap(), Mockito.any(Consumer.class)); + RabbitTemplate template = new RabbitTemplate(new SingleConnectionFactory(mockConnectionFactory)); + template.setReplyTimeout(1); + Message input = new Message("Hello, world!".getBytes(), new MessageProperties()); + template.doSendAndReceiveWithTemporary("foo", "bar", input); + Envelope envelope = new Envelope(1, false, "foo", "bar"); + // used to hang here because of the SynchronousQueue and doSendAndReceive() already exited + consumer.get().handleDelivery("foo", envelope, new AMQP.BasicProperties(), new byte[0]); + } }