AMQP-274 Local Transaction Issues

When running a listener container with local transactions
(channelTransacted, and no external transaction manager), the
consumer's channel is bound to the thread for use by downstream
RabbitTemplates.

However, the syncronizedWithTransaction boolean was not set so
the RabbitTemplate closed the channel after its operation.

We should never close the consumer's channel.

The solution is to set the boolean when binding the resource.

In addition, when using a RabbitTransactionManager, the
RabbitResourceHolder.closeAll() method would close the consumer's
channel.

Previously, the consumer's channel was registered with a
ThreadLocal in the ConnectionFactoryUtils. This enabled
the doGetTransactionalResourceHolder method to bind the
consumer's channel.

The RabbitResourceHolder.closeAll() now examines the channels
is it closing and skips the close for the consumer's channel.

Added tests to the Local and External transaction test cases
to ensure the appropriate channel.close() calls are executed,
depending on the scenario. e.g. a local transaction with
exposeListenerChannel=false should close() the exposed channel
but not the consumer's channel.
This commit is contained in:
Gary Russell
2012-10-25 18:14:09 -04:00
parent dd54e66fab
commit 9c29a74add
5 changed files with 78 additions and 15 deletions

View File

@@ -70,6 +70,14 @@ public class ConnectionFactoryUtils {
consumerChannel.remove();
}
/**
* See registerConsumerChannel. This method is called to retrieve the
* channel for this consumer.
*/
public static Channel getConsumerChannel() {
return consumerChannel.get();
}
/**
* Determine whether the given RabbitMQ Channel is transactional, that is, bound to the current thread by Spring's
* transaction facilities.

View File

@@ -154,7 +154,14 @@ public class RabbitResourceHolder extends ResourceHolderSupport {
public void closeAll() {
for (Channel channel : this.channels) {
try {
channel.close();
if (channel != ConnectionFactoryUtils.getConsumerChannel()) {
channel.close();
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping close of consumer channel: " + channel.toString());
}
}
} catch (Throwable ex) {
logger.debug("Could not close synchronized Rabbit Channel after transaction", ex);
}

View File

@@ -473,8 +473,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
} else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted(channel);
if (bindChannel) {
RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
new RabbitResourceHolder(channel, false));
resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, message);
@@ -523,6 +525,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
*/
if (isChannelLocallyTransacted(channelToUse) &&
!TransactionSynchronizationManager.isActualTransactionActive()) {
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
boundHere = true;
@@ -531,8 +534,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
else {
// if locally transacted, bind the current channel to make it available to RabbitTemplate
if (isChannelLocallyTransacted(channel)) {
RabbitResourceHolder localResourceHolder = new RabbitResourceHolder(channelToUse, false);
localResourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
new RabbitResourceHolder(channelToUse, false));
localResourceHolder);
boundHere = true;
}
}
@@ -543,6 +548,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
throw wrapToListenerExecutionFailedExceptionIfNeeded(e);
}
} finally {
if (resourceHolder != null && boundHere) {
// so the channel exposed (because exposeListenerChannel is false) will be closed
resourceHolder.setSynchronizedWithTransaction(false);
}
ConnectionFactoryUtils.releaseResources(resourceHolder);
if (boundHere) {
// unbind if we bound

View File

@@ -15,6 +15,7 @@
*/
package org.springframework.amqp.rabbit.listener;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
@@ -22,6 +23,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -38,6 +40,7 @@ import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
@@ -144,6 +147,12 @@ public class ExternalTxManagerTests {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
// verify close() was never called on the channel
DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory);
List<?> channels = (List<?>) dfa.getPropertyValue("cachedChannelsTransactional");
assertEquals(0, channels.size());
container.stop();
}
@@ -159,7 +168,7 @@ public class ExternalTxManagerTests {
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
@@ -205,11 +214,11 @@ public class ExternalTxManagerTests {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
@@ -238,6 +247,10 @@ public class ExternalTxManagerTests {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
// verify close() was never called on the channel
verify(onlyChannel, Mockito.never()).close();
container.stop();
assertSame(onlyChannel, exposed.get());
@@ -255,7 +268,7 @@ public class ExternalTxManagerTests {
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
@@ -301,11 +314,11 @@ public class ExternalTxManagerTests {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
@@ -335,6 +348,10 @@ public class ExternalTxManagerTests {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
// verify close() was never called on the channel
verify(onlyChannel, Mockito.never()).close();
container.stop();
assertSame(onlyChannel, exposed.get());
@@ -428,6 +445,12 @@ public class ExternalTxManagerTests {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
// verify close() was never called on the channel
DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory);
List<?> channels = (List<?>) dfa.getPropertyValue("cachedChannelsTransactional");
assertEquals(0, channels.size());
container.stop();
}

View File

@@ -15,6 +15,7 @@
*/
package org.springframework.amqp.rabbit.listener;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
@@ -22,6 +23,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -37,6 +39,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.DirectFieldAccessor;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
@@ -138,6 +141,12 @@ public class LocallyTransactedTests {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
// verify close() was never called on the channel
DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory);
List<?> channels = (List<?>) dfa.getPropertyValue("cachedChannelsTransactional");
assertEquals(0, channels.size());
container.stop();
}
@@ -153,7 +162,7 @@ public class LocallyTransactedTests {
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
@@ -199,11 +208,11 @@ public class LocallyTransactedTests {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
@@ -231,6 +240,10 @@ public class LocallyTransactedTests {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
// verify close() was never called on the channel
verify(onlyChannel, Mockito.never()).close();
container.stop();
assertSame(onlyChannel, exposed.get());
@@ -250,7 +263,7 @@ public class LocallyTransactedTests {
final Channel secondChannel = mock(Channel.class);
when(secondChannel.isOpen()).thenReturn(true);
final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
@@ -293,11 +306,11 @@ public class LocallyTransactedTests {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
@@ -331,5 +344,8 @@ public class LocallyTransactedTests {
container.stop();
assertSame(secondChannel, exposed.get());
verify(firstChannel, Mockito.never()).close();
verify(secondChannel, Mockito.times(1)).close();
}
}