AMQP-260 Bind Channel With Local Transaction

When using local transactions (no tx manager), the channel
should be bound to the thread to enable any upstream
RabbitTemplate operations to use the same channel.

When an external tx manager is a RabbitTransactionManager,
a different channel was bound, and the consumer channel
was not committed.

1. When invoking a MessageListener (not ChannelAware),
and exposeListenerChannel is true (default),
bind the consumer's channel to the thread to make
it available for up-stack RabbitTemplate operations.

2. When invoking a ChannelAwareMessageListener, and there
is no external tx manager, and exposeListenerChannel is
true (default), bind the channel to the thread to make
it available for up-stack RabbitTemplate operations.

3. When invoking a ChannelAwareMessageListener, and there
is no external tx manager, and exposeListenerChannel is
false, bind the new (temporary) channel to the thread to
make it available for up-stack RabbitTemplate operations.
Note that work on this temporary channel is committed on
the return and txSize has no bearing.

4. Whenever there is an external transaction manager, the
exposeListenerChannel is ignored - it is always exposed in
that case this is already documented in javadoc - add a
WARN log to the initialization code.

5. When there is an external transaction manager, (but not
a RabbitTransactionManager) the
listener's channel is always bound to the thread. This was
always the case and is mentioned here for completeness.

6. Previously, when an external transaction manager was
a RabbitTransactionManager, the wrong channel was bound
to the thread, and it was that channel that was committed,
instead of the listener channel. This was the originally
reported issue, with the workaround being to remove
the transaction manager; but that caused no channel to
be bound.

This last problem was due to the fact that the transaction
template obtained a new connection and bound it. It was
that connection that the TxManager committed.

Solution was to add a ThreadLocal to ConnectionFactoryUtils
to hold consumer channels. When the transaction starts,
the utils now look at this ThreadLocal before creating
a new connection.

The connection is unregistered when the consumer exits.

Add tests for transacted (with/without txMgr) for MessageListener
and ChannelAwareMessageListener to ensure the channel
is bound for use by up-stack templates.

Add test with Rabbit TxMgr showing correct channel
is bound and committed.

AMQP-260 Polishing
This commit is contained in:
Gary Russell
2012-07-24 14:46:18 -04:00
committed by Oleg Zhurakousky
parent 6667565f54
commit 90c526eec6
5 changed files with 895 additions and 4 deletions

View File

@@ -15,6 +15,8 @@ package org.springframework.amqp.rabbit.connection;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpIOException;
import org.springframework.transaction.support.ResourceHolderSynchronization;
import org.springframework.transaction.support.TransactionSynchronization;
@@ -37,6 +39,37 @@ import com.rabbitmq.client.Channel;
*/
public class ConnectionFactoryUtils {
private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class);
private static final ThreadLocal<Channel> consumerChannel = new ThreadLocal<Channel>();
/**
* If a listener container is configured to use a RabbitTransactionManager, the
* consumer's channel is registered here so that it is used as the bound resource
* when the transaction actually starts. It is normally not necessary to use
* an external transaction manager because local transactions work the same in that
* the channel is bound to the thread. This is for the case when a user happens
* to wire in a RabbitTransactionManager.
* @param channel
*/
public static void registerConsumerChannel(Channel channel) {
if (logger.isDebugEnabled()) {
logger.debug("Registering consumer channel" + channel);
}
consumerChannel.set(channel);
}
/**
* See registerConsumerChannel. This method is called to unregister
* the channel when the consumer exits.
*/
public static void unRegisterConsumerChannel() {
if (logger.isDebugEnabled()) {
logger.debug("Unregistering consumer channel" + consumerChannel.get());
}
consumerChannel.remove();
}
/**
* Determine whether the given RabbitMQ Channel is transactional, that is, bound to the current thread by Spring's
* transaction facilities.
@@ -122,7 +155,10 @@ public class ConnectionFactoryUtils {
connection = resourceFactory.createConnection();
resourceHolderToUse.addConnection(connection);
}
channel = resourceFactory.createChannel(connection);
channel = consumerChannel.get();
if (channel == null) {
channel = resourceFactory.createChannel(connection);
}
resourceHolderToUse.addChannel(channel, connection);
if (resourceHolderToUse != resourceHolder) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2011 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -21,10 +21,12 @@ import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
@@ -35,6 +37,7 @@ import com.rabbitmq.client.Channel;
* @author Mark Fisher
* @author Dave Syer
* @author James Carr
* @author Gary Russell
*/
public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements BeanNameAware, DisposableBean,
SmartLifecycle {
@@ -468,7 +471,20 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
if (listener instanceof ChannelAwareMessageListener) {
doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
} else if (listener instanceof MessageListener) {
doInvokeListener((MessageListener) listener, message);
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted(channel);
if (bindChannel) {
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
new RabbitResourceHolder(channel, false));
}
try {
doInvokeListener((MessageListener) listener, message);
}
finally {
if (bindChannel) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
}
}
} else if (listener != null) {
throw new IllegalArgumentException("Only MessageListener and SessionAwareMessageListener supported: "
+ listener);
@@ -493,12 +509,32 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
throws Exception {
RabbitResourceHolder resourceHolder = null;
Channel channelToUse = channel;
boolean boundHere = false;
try {
Channel channelToUse = channel;
if (!isExposeListenerChannel()) {
// We need to expose a separate Channel.
resourceHolder = getTransactionalResourceHolder();
channelToUse = resourceHolder.getChannel();
/*
* If there is a real transaction, the resource will have been bound; otherwise
* we need to bind it temporarily here. Any work done on this channel
* will be committed in the finally block.
*/
if (isChannelLocallyTransacted(channelToUse) &&
!TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
boundHere = true;
}
}
else {
// if locally transacted, bind the current channel to make it available to RabbitTemplate
if (isChannelLocallyTransacted(channel)) {
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
new RabbitResourceHolder(channelToUse, false));
boundHere = true;
}
}
// Actually invoke the message listener...
try {
@@ -508,6 +544,19 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor im
}
} finally {
ConnectionFactoryUtils.releaseResources(resourceHolder);
if (boundHere) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
if (!isExposeListenerChannel() && isChannelLocallyTransacted(channelToUse)) {
/*
* commit the temporary channel we exposed; the consumer's channel
* will be committed later. Note that when exposing a different channel
* when there's no transaction manager, the exposed channel is committed
* on each message, and not based on txSize.
*/
RabbitUtils.commitIfNecessary(channelToUse);
}
}
}
}

View File

@@ -293,6 +293,9 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
*/
@Override
protected void doInitialize() throws Exception {
if (!this.isExposeListenerChannel() && this.transactionManager != null) {
logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
}
initializeProxy();
}
@@ -528,6 +531,14 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
throw t;
}
if (SimpleMessageListenerContainer.this.transactionManager != null) {
/*
* Register the consumer's channel so it will be used by the transaction manager
* if it's an instance of RabbitTransactionManager.
*/
ConnectionFactoryUtils.registerConsumerChannel(consumer.getChannel());
}
// Always better to stop receiving as soon as possible if
// transactional
boolean continuable = false;
@@ -563,6 +574,11 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
+ "Exception summary: " + t);
}
}
finally {
if (SimpleMessageListenerContainer.this.transactionManager != null) {
ConnectionFactoryUtils.unRegisterConsumerChannel();
}
}
// In all cases count down to allow container to progress beyond startup
start.countDown();

View File

@@ -0,0 +1,455 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.listener;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
/**
* @author Gary Russell
* @since 1.1.2
*
*/
public class ExternalTxManagerTests {
/**
* Verifies that an up-stack RabbitTemplate uses the listener's
* channel (MessageListener).
*/
@Test
public void testMessageListener() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
final AtomicReference<Exception> tooManyChannels = new AtomicReference<Exception>();
doAnswer(new Answer<Channel>(){
boolean done;
@Override
public Channel answer(InvocationOnMock invocation) throws Throwable {
if (!done) {
done = true;
return onlyChannel;
}
tooManyChannels.set(new Exception("More than one channel requested"));
Channel channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
return channel;
}
}).when(mockConnection).createChannel();
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[2]);
return null;
}
}).when(onlyChannel)
.basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(Consumer.class));
final CountDownLatch commitLatch = new CountDownLatch(1);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
commitLatch.countDown();
return null;
}
}).when(onlyChannel).txCommit();
final CountDownLatch latch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
container.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
latch.countDown();
}
});
container.setQueueNames("queue");
container.setChannelTransacted(true);
container.setShutdownTimeout(100);
container.setTransactionManager(new DummyTxManager());
container.afterPropertiesSet();
container.start();
consumer.get().handleDelivery("qux", new Envelope(1, false, "foo", "bar"), new BasicProperties(), new byte[] {0});
assertTrue(latch.await(10, TimeUnit.SECONDS));
Exception e = tooManyChannels.get();
if (e != null) {
throw e;
}
verify(mockConnection, Mockito.times(1)).createChannel();
assertTrue(commitLatch.await(10, TimeUnit.SECONDS));
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
container.stop();
}
/**
* Verifies that an up-stack RabbitTemplate uses the listener's
* channel (ChannelAwareMessageListener).
*/
@Test
public void testChannelAwareMessageListener() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
final AtomicReference<Exception> tooManyChannels = new AtomicReference<Exception>();
doAnswer(new Answer<Channel>(){
boolean done;
@Override
public Channel answer(InvocationOnMock invocation) throws Throwable {
if (!done) {
done = true;
return onlyChannel;
}
tooManyChannels.set(new Exception("More than one channel requested"));
Channel channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
return channel;
}
}).when(mockConnection).createChannel();
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[2]);
return null;
}
}).when(onlyChannel)
.basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(Consumer.class));
final CountDownLatch commitLatch = new CountDownLatch(1);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
commitLatch.countDown();
return null;
}
}).when(onlyChannel).txCommit();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
latch.countDown();
}
});
container.setQueueNames("queue");
container.setChannelTransacted(true);
container.setShutdownTimeout(100);
container.setTransactionManager(new DummyTxManager());
container.afterPropertiesSet();
container.start();
consumer.get().handleDelivery("qux", new Envelope(1, false, "foo", "bar"), new BasicProperties(), new byte[] {0});
assertTrue(latch.await(10, TimeUnit.SECONDS));
Exception e = tooManyChannels.get();
if (e != null) {
throw e;
}
verify(mockConnection, Mockito.times(1)).createChannel();
assertTrue(commitLatch.await(10, TimeUnit.SECONDS));
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
container.stop();
assertSame(onlyChannel, exposed.get());
}
/**
* Verifies that an up-stack RabbitTemplate uses the listener's
* channel (ChannelAwareMessageListener). exposeListenerChannel=false
* is ignored (ChannelAwareMessageListener).
*/
@Test
public void testChannelAwareMessageListenerDontExpose() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
final AtomicReference<Exception> tooManyChannels = new AtomicReference<Exception>();
doAnswer(new Answer<Channel>(){
boolean done;
@Override
public Channel answer(InvocationOnMock invocation) throws Throwable {
if (!done) {
done = true;
return onlyChannel;
}
tooManyChannels.set(new Exception("More than one channel requested"));
Channel channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
return channel;
}
}).when(mockConnection).createChannel();
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[2]);
return null;
}
}).when(onlyChannel)
.basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(Consumer.class));
final CountDownLatch commitLatch = new CountDownLatch(1);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
commitLatch.countDown();
return null;
}
}).when(onlyChannel).txCommit();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
latch.countDown();
}
});
container.setQueueNames("queue");
container.setChannelTransacted(true);
container.setExposeListenerChannel(false);
container.setShutdownTimeout(100);
container.setTransactionManager(new DummyTxManager());
container.afterPropertiesSet();
container.start();
consumer.get().handleDelivery("qux", new Envelope(1, false, "foo", "bar"), new BasicProperties(), new byte[] {0});
assertTrue(latch.await(10, TimeUnit.SECONDS));
Exception e = tooManyChannels.get();
if (e != null) {
throw e;
}
verify(mockConnection, Mockito.times(1)).createChannel();
assertTrue(commitLatch.await(10, TimeUnit.SECONDS));
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
container.stop();
assertSame(onlyChannel, exposed.get());
}
/**
* Verifies the proper channel is bound when using a RabbitTransactionManager.
* Previously, the wrong channel was bound. See AMQP-260.
* @throws Exception
*/
@Test
public void testMessageListenerWithRabbitTxManager() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
final AtomicReference<Exception> tooManyChannels = new AtomicReference<Exception>();
doAnswer(new Answer<Channel>(){
boolean done;
@Override
public Channel answer(InvocationOnMock invocation) throws Throwable {
if (!done) {
done = true;
return onlyChannel;
}
tooManyChannels.set(new Exception("More than one channel requested"));
Channel channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
return channel;
}
}).when(mockConnection).createChannel();
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[2]);
return null;
}
}).when(onlyChannel)
.basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(Consumer.class));
final CountDownLatch commitLatch = new CountDownLatch(1);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
commitLatch.countDown();
return null;
}
}).when(onlyChannel).txCommit();
final CountDownLatch latch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
container.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
latch.countDown();
}
});
container.setQueueNames("queue");
container.setChannelTransacted(true);
container.setShutdownTimeout(100);
container.setTransactionManager(new RabbitTransactionManager(cachingConnectionFactory));
container.afterPropertiesSet();
container.start();
consumer.get().handleDelivery("qux", new Envelope(1, false, "foo", "bar"), new BasicProperties(), new byte[] {0});
assertTrue(latch.await(10, TimeUnit.SECONDS));
Exception e = tooManyChannels.get();
if (e != null) {
throw e;
}
verify(mockConnection, Mockito.times(1)).createChannel();
assertTrue(commitLatch.await(10, TimeUnit.SECONDS));
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
container.stop();
}
@SuppressWarnings("serial")
private static class DummyTxManager extends AbstractPlatformTransactionManager {
@Override
protected Object doGetTransaction() throws TransactionException {
return new Object();
}
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
}
@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
}
@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
}
}
}

View File

@@ -0,0 +1,335 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.listener;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
/**
* @author Gary Russell
* @since 1.1.2
*
*/
public class LocallyTransactedTests {
/**
* Verifies that an up-stack RabbitTemplate uses the listener's
* channel (MessageListener).
*/
@Test
public void testMessageListener() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
final AtomicReference<Exception> tooManyChannels = new AtomicReference<Exception>();
doAnswer(new Answer<Channel>(){
boolean done;
@Override
public Channel answer(InvocationOnMock invocation) throws Throwable {
if (!done) {
done = true;
return onlyChannel;
}
tooManyChannels.set(new Exception("More than one channel requested"));
Channel channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
return channel;
}
}).when(mockConnection).createChannel();
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[2]);
return null;
}
}).when(onlyChannel)
.basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(Consumer.class));
final CountDownLatch commitLatch = new CountDownLatch(1);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
commitLatch.countDown();
return null;
}
}).when(onlyChannel).txCommit();
final CountDownLatch latch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
container.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
latch.countDown();
}
});
container.setQueueNames("queue");
container.setChannelTransacted(true);
container.setShutdownTimeout(100);
container.afterPropertiesSet();
container.start();
consumer.get().handleDelivery("qux", new Envelope(1, false, "foo", "bar"), new BasicProperties(), new byte[] {0});
assertTrue(latch.await(10, TimeUnit.SECONDS));
Exception e = tooManyChannels.get();
if (e != null) {
throw e;
}
verify(mockConnection, Mockito.times(1)).createChannel();
assertTrue(commitLatch.await(10, TimeUnit.SECONDS));
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
container.stop();
}
/**
* Verifies that an up-stack RabbitTemplate uses the listener's
* channel (ChannelAwareMessageListener).
*/
@Test
public void testChannelAwareMessageListener() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
final AtomicReference<Exception> tooManyChannels = new AtomicReference<Exception>();
doAnswer(new Answer<Channel>(){
boolean done;
@Override
public Channel answer(InvocationOnMock invocation) throws Throwable {
if (!done) {
done = true;
return onlyChannel;
}
tooManyChannels.set(new Exception("More than one channel requested"));
Channel channel = mock(Channel.class);
when(channel.isOpen()).thenReturn(true);
return channel;
}
}).when(mockConnection).createChannel();
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[2]);
return null;
}
}).when(onlyChannel)
.basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(Consumer.class));
final CountDownLatch commitLatch = new CountDownLatch(1);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
commitLatch.countDown();
return null;
}
}).when(onlyChannel).txCommit();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
latch.countDown();
}
});
container.setQueueNames("queue");
container.setChannelTransacted(true);
container.setShutdownTimeout(100);
container.afterPropertiesSet();
container.start();
consumer.get().handleDelivery("qux", new Envelope(1, false, "foo", "bar"), new BasicProperties(), new byte[] {0});
assertTrue(latch.await(10, TimeUnit.SECONDS));
Exception e = tooManyChannels.get();
if (e != null) {
throw e;
}
verify(mockConnection, Mockito.times(1)).createChannel();
assertTrue(commitLatch.await(10, TimeUnit.SECONDS));
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
container.stop();
assertSame(onlyChannel, exposed.get());
}
/**
* Verifies that the listener channel is not exposed when so configured and
* up-stack RabbitTemplate uses the additional channel.
* created when exposeListenerChannel is false (ChannelAwareMessageListener).
*/
@Test
public void testChannelAwareMessageListenerDontExpose() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
final Channel firstChannel = mock(Channel.class);
when(firstChannel.isOpen()).thenReturn(true);
final Channel secondChannel = mock(Channel.class);
when(secondChannel.isOpen()).thenReturn(true);
final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
final AtomicReference<Exception> tooManyChannels = new AtomicReference<Exception>();
doAnswer(new Answer<Channel>(){
boolean done;
@Override
public Channel answer(InvocationOnMock invocation) throws Throwable {
if (!done) {
done = true;
return firstChannel;
}
return secondChannel;
}
}).when(mockConnection).createChannel();
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[2]);
return null;
}
}).when(firstChannel)
.basicConsume(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(Consumer.class));
final CountDownLatch commitLatch = new CountDownLatch(1);
doAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
commitLatch.countDown();
return null;
}
}).when(firstChannel).txCommit();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
latch.countDown();
}
});
container.setQueueNames("queue");
container.setChannelTransacted(true);
container.setExposeListenerChannel(false);
container.setShutdownTimeout(100);
container.afterPropertiesSet();
container.start();
consumer.get().handleDelivery("qux", new Envelope(1, false, "foo", "bar"), new BasicProperties(), new byte[] {0});
assertTrue(latch.await(10, TimeUnit.SECONDS));
Exception e = tooManyChannels.get();
if (e != null) {
throw e;
}
// once for listener, once for exposed + 0 for template (used bound)
verify(mockConnection, Mockito.times(2)).createChannel();
assertTrue(commitLatch.await(10, TimeUnit.SECONDS));
verify(firstChannel).txCommit();
verify(secondChannel).txCommit();
verify(secondChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
container.stop();
assertSame(secondChannel, exposed.get());
}
}