diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java index 212f089a..a099154d 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java @@ -70,6 +70,14 @@ public class ConnectionFactoryUtils { consumerChannel.remove(); } + /** + * See registerConsumerChannel. This method is called to retrieve the + * channel for this consumer. + */ + public static Channel getConsumerChannel() { + return consumerChannel.get(); + } + /** * Determine whether the given RabbitMQ Channel is transactional, that is, bound to the current thread by Spring's * transaction facilities. diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java index cb9ca8f9..87ea3938 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java @@ -154,7 +154,14 @@ public class RabbitResourceHolder extends ResourceHolderSupport { public void closeAll() { for (Channel channel : this.channels) { try { - channel.close(); + if (channel != ConnectionFactoryUtils.getConsumerChannel()) { + channel.close(); + } + else { + if (logger.isDebugEnabled()) { + logger.debug("Skipping close of consumer channel: " + channel.toString()); + } + } } catch (Throwable ex) { logger.debug("Could not close synchronized Rabbit Channel after transaction", ex); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index 494971bd..c5b164cd 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -473,8 +473,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im } else if (listener instanceof MessageListener) { boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted(channel); if (bindChannel) { + RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false); + resourceHolder.setSynchronizedWithTransaction(true); TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), - new RabbitResourceHolder(channel, false)); + resourceHolder); } try { doInvokeListener((MessageListener) listener, message); @@ -523,6 +525,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im */ if (isChannelLocallyTransacted(channelToUse) && !TransactionSynchronizationManager.isActualTransactionActive()) { + resourceHolder.setSynchronizedWithTransaction(true); TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), resourceHolder); boundHere = true; @@ -531,8 +534,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im else { // if locally transacted, bind the current channel to make it available to RabbitTemplate if (isChannelLocallyTransacted(channel)) { + RabbitResourceHolder localResourceHolder = new RabbitResourceHolder(channelToUse, false); + localResourceHolder.setSynchronizedWithTransaction(true); TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), - new RabbitResourceHolder(channelToUse, false)); + localResourceHolder); boundHere = true; } } @@ -543,6 +548,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im throw wrapToListenerExecutionFailedExceptionIfNeeded(e); } } finally { + if (resourceHolder != null && boundHere) { + // so the channel exposed (because exposeListenerChannel is false) will be closed + resourceHolder.setSynchronizedWithTransaction(false); + } ConnectionFactoryUtils.releaseResources(resourceHolder); if (boundHere) { // unbind if we bound diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java index 663d601f..7705fef5 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java @@ -15,6 +15,7 @@ */ package org.springframework.amqp.rabbit.listener; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; @@ -22,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -38,6 +40,7 @@ import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.support.AbstractPlatformTransactionManager; @@ -144,6 +147,12 @@ public class ExternalTxManagerTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory); + List channels = (List) dfa.getPropertyValue("cachedChannelsTransactional"); + assertEquals(0, channels.size()); + container.stop(); } @@ -159,7 +168,7 @@ public class ExternalTxManagerTests { final Channel onlyChannel = mock(Channel.class); when(onlyChannel.isOpen()).thenReturn(true); - final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); + final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); when(mockConnection.isOpen()).thenReturn(true); @@ -205,11 +214,11 @@ public class ExternalTxManagerTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exposed = new AtomicReference(); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, Channel channel) { exposed.set(channel); - RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); + RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory); rabbitTemplate.setChannelTransacted(true); // should use same channel as container rabbitTemplate.convertAndSend("foo", "bar", "baz"); @@ -238,6 +247,10 @@ public class ExternalTxManagerTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + verify(onlyChannel, Mockito.never()).close(); + container.stop(); assertSame(onlyChannel, exposed.get()); @@ -255,7 +268,7 @@ public class ExternalTxManagerTests { final Channel onlyChannel = mock(Channel.class); when(onlyChannel.isOpen()).thenReturn(true); - final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); + final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); when(mockConnection.isOpen()).thenReturn(true); @@ -301,11 +314,11 @@ public class ExternalTxManagerTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exposed = new AtomicReference(); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, Channel channel) { exposed.set(channel); - RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); + RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory); rabbitTemplate.setChannelTransacted(true); // should use same channel as container rabbitTemplate.convertAndSend("foo", "bar", "baz"); @@ -335,6 +348,10 @@ public class ExternalTxManagerTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + verify(onlyChannel, Mockito.never()).close(); + container.stop(); assertSame(onlyChannel, exposed.get()); @@ -428,6 +445,12 @@ public class ExternalTxManagerTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory); + List channels = (List) dfa.getPropertyValue("cachedChannelsTransactional"); + assertEquals(0, channels.size()); + container.stop(); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java index a1dec973..a54a2fbb 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java @@ -15,6 +15,7 @@ */ package org.springframework.amqp.rabbit.listener; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; @@ -22,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -37,6 +39,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.DirectFieldAccessor; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; @@ -138,6 +141,12 @@ public class LocallyTransactedTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory); + List channels = (List) dfa.getPropertyValue("cachedChannelsTransactional"); + assertEquals(0, channels.size()); + container.stop(); } @@ -153,7 +162,7 @@ public class LocallyTransactedTests { final Channel onlyChannel = mock(Channel.class); when(onlyChannel.isOpen()).thenReturn(true); - final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); + final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); when(mockConnection.isOpen()).thenReturn(true); @@ -199,11 +208,11 @@ public class LocallyTransactedTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exposed = new AtomicReference(); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, Channel channel) { exposed.set(channel); - RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); + RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory); rabbitTemplate.setChannelTransacted(true); // should use same channel as container rabbitTemplate.convertAndSend("foo", "bar", "baz"); @@ -231,6 +240,10 @@ public class LocallyTransactedTests { verify(onlyChannel).txCommit(); verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class)); + + // verify close() was never called on the channel + verify(onlyChannel, Mockito.never()).close(); + container.stop(); assertSame(onlyChannel, exposed.get()); @@ -250,7 +263,7 @@ public class LocallyTransactedTests { final Channel secondChannel = mock(Channel.class); when(secondChannel.isOpen()).thenReturn(true); - final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); + final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory); when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection); when(mockConnection.isOpen()).thenReturn(true); @@ -293,11 +306,11 @@ public class LocallyTransactedTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exposed = new AtomicReference(); - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, Channel channel) { exposed.set(channel); - RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); + RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory); rabbitTemplate.setChannelTransacted(true); // should use same channel as container rabbitTemplate.convertAndSend("foo", "bar", "baz"); @@ -331,5 +344,8 @@ public class LocallyTransactedTests { container.stop(); assertSame(secondChannel, exposed.get()); + + verify(firstChannel, Mockito.never()).close(); + verify(secondChannel, Mockito.times(1)).close(); } }