From 9c29a74addcf8ebefb2a2fcbe26b62014181bbd9 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 25 Oct 2012 18:14:09 -0400 Subject: [PATCH] AMQP-274 Local Transaction Issues When running a listener container with local transactions (channelTransacted, and no external transaction manager), the consumer's channel is bound to the thread for use by downstream RabbitTemplates. However, the syncronizedWithTransaction boolean was not set so the RabbitTemplate closed the channel after its operation. We should never close the consumer's channel. The solution is to set the boolean when binding the resource. In addition, when using a RabbitTransactionManager, the RabbitResourceHolder.closeAll() method would close the consumer's channel. Previously, the consumer's channel was registered with a ThreadLocal in the ConnectionFactoryUtils. This enabled the doGetTransactionalResourceHolder method to bind the consumer's channel. The RabbitResourceHolder.closeAll() now examines the channels is it closing and skips the close for the consumer's channel. Added tests to the Local and External transaction test cases to ensure the appropriate channel.close() calls are executed, depending on the scenario. e.g. a local transaction with exposeListenerChannel=false should close() the exposed channel but not the consumer's channel. --- .../connection/ConnectionFactoryUtils.java | 8 +++++ .../connection/RabbitResourceHolder.java | 9 ++++- .../AbstractMessageListenerContainer.java | 13 +++++-- .../listener/ExternalTxManagerTests.java | 35 +++++++++++++++---- .../listener/LocallyTransactedTests.java | 28 +++++++++++---- 5 files changed, 78 insertions(+), 15 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java index 212f089a..a099154d 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java @@ -70,6 +70,14 @@ public class ConnectionFactoryUtils { consumerChannel.remove(); } + /** + * See registerConsumerChannel. This method is called to retrieve the + * channel for this consumer. + */ + public static Channel getConsumerChannel() { + return consumerChannel.get(); + } + /** * Determine whether the given RabbitMQ Channel is transactional, that is, bound to the current thread by Spring's * transaction facilities. diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java index cb9ca8f9..87ea3938 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java @@ -154,7 +154,14 @@ public class RabbitResourceHolder extends ResourceHolderSupport { public void closeAll() { for (Channel channel : this.channels) { try { - channel.close(); + if (channel != ConnectionFactoryUtils.getConsumerChannel()) { + channel.close(); + } + else { + if (logger.isDebugEnabled()) { + logger.debug("Skipping close of consumer channel: " + channel.toString()); + } + } } catch (Throwable ex) { logger.debug("Could not close synchronized Rabbit Channel after transaction", ex); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index 494971bd..c5b164cd 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -473,8 +473,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im } else if (listener instanceof MessageListener) { boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted(channel); if (bindChannel) { + RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false); + resourceHolder.setSynchronizedWithTransaction(true); TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), - new RabbitResourceHolder(channel, false)); + resourceHolder); } try { doInvokeListener((MessageListener) listener, message); @@ -523,6 +525,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im */ if (isChannelLocallyTransacted(channelToUse) && !TransactionSynchronizationManager.isActualTransactionActive()) { + resourceHolder.setSynchronizedWithTransaction(true); TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), resourceHolder); boundHere = true; @@ -531,8 +534,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im else { // if locally transacted, bind the current channel to make it available to RabbitTemplate if (isChannelLocallyTransacted(channel)) { + RabbitResourceHolder localResourceHolder = new RabbitResourceHolder(channelToUse, false); + localResourceHolder.setSynchronizedWithTransaction(true); TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), - new RabbitResourceHolder(channelToUse, false)); + localResourceHolder); boundHere = true; } } @@ -543,6 +548,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im throw wrapToListenerExecutionFailedExceptionIfNeeded(e); } } finally { + if (resourceHolder != null && boundHere) { + // so the channel exposed (because exposeListenerChannel is false) will be closed + resourceHolder.setSynchronizedWithTransaction(false); + } ConnectionFactoryUtils.releaseResources(resourceHolder); if (boundHere) { // unbind if we bound diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java index 663d601f..7705fef5 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java @@ -15,6 +15,7 @@ */ package org.springframework.amqp.rabbit.listener; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; @@ -22,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -38,6 +40,7 @@ import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.support.AbstractPlatformTransactionManager; @@ -144,6 +147,12 @@ public class ExternalTxManagerTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory); + List channels = (List) dfa.getPropertyValue("cachedChannelsTransactional"); + assertEquals(0, channels.size()); + container.stop(); } @@ -159,7 +168,7 @@ public class ExternalTxManagerTests { final Channel onlyChannel = mock(Channel.class); when(onlyChannel.isOpen()).thenReturn(true); - final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); + final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); when(mockConnection.isOpen()).thenReturn(true); @@ -205,11 +214,11 @@ public class ExternalTxManagerTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exposed = new AtomicReference(); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, Channel channel) { exposed.set(channel); - RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); + RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory); rabbitTemplate.setChannelTransacted(true); // should use same channel as container rabbitTemplate.convertAndSend("foo", "bar", "baz"); @@ -238,6 +247,10 @@ public class ExternalTxManagerTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + verify(onlyChannel, Mockito.never()).close(); + container.stop(); assertSame(onlyChannel, exposed.get()); @@ -255,7 +268,7 @@ public class ExternalTxManagerTests { final Channel onlyChannel = mock(Channel.class); when(onlyChannel.isOpen()).thenReturn(true); - final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); + final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); when(mockConnection.isOpen()).thenReturn(true); @@ -301,11 +314,11 @@ public class ExternalTxManagerTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exposed = new AtomicReference(); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, Channel channel) { exposed.set(channel); - RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); + RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory); rabbitTemplate.setChannelTransacted(true); // should use same channel as container rabbitTemplate.convertAndSend("foo", "bar", "baz"); @@ -335,6 +348,10 @@ public class ExternalTxManagerTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + verify(onlyChannel, Mockito.never()).close(); + container.stop(); assertSame(onlyChannel, exposed.get()); @@ -428,6 +445,12 @@ public class ExternalTxManagerTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory); + List channels = (List) dfa.getPropertyValue("cachedChannelsTransactional"); + assertEquals(0, channels.size()); + container.stop(); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java index a1dec973..a54a2fbb 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java @@ -15,6 +15,7 @@ */ package org.springframework.amqp.rabbit.listener; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; @@ -22,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -37,6 +39,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.DirectFieldAccessor; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; @@ -138,6 +141,12 @@ public class LocallyTransactedTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory); + List channels = (List) dfa.getPropertyValue("cachedChannelsTransactional"); + assertEquals(0, channels.size()); + container.stop(); } @@ -153,7 +162,7 @@ public class LocallyTransactedTests { final Channel onlyChannel = mock(Channel.class); when(onlyChannel.isOpen()).thenReturn(true); - final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); + final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); when(mockConnection.isOpen()).thenReturn(true); @@ -199,11 +208,11 @@ public class LocallyTransactedTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exposed = new AtomicReference(); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, Channel channel) { exposed.set(channel); - RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); + RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory); rabbitTemplate.setChannelTransacted(true); // should use same channel as container rabbitTemplate.convertAndSend("foo", "bar", "baz"); @@ -231,6 +240,10 @@ public class LocallyTransactedTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + verify(onlyChannel, Mockito.never()).close(); + container.stop(); assertSame(onlyChannel, exposed.get()); @@ -250,7 +263,7 @@ public class LocallyTransactedTests { final Channel secondChannel = mock(Channel.class); when(secondChannel.isOpen()).thenReturn(true); - final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); + final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); when(mockConnection.isOpen()).thenReturn(true); @@ -293,11 +306,11 @@ public class LocallyTransactedTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exposed = new AtomicReference(); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, Channel channel) { exposed.set(channel); - RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); + RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory); rabbitTemplate.setChannelTransacted(true); // should use same channel as container rabbitTemplate.convertAndSend("foo", "bar", "baz"); @@ -331,5 +344,8 @@ public class LocallyTransactedTests { container.stop(); assertSame(secondChannel, exposed.get()); + + verify(firstChannel, Mockito.never()).close(); + verify(secondChannel, Mockito.times(1)).close(); } }