AMQP-190 Memory Leak With Tx and RabbitTemplate

When the RabbitTemplate is invoked with an existing transaction,
the channel is bound to the thread, and

  ConnectionFactoryUtils.releaseResources(resourceHolder);

is called after processing. There was a commented-out line
in ConnectionFactoryUtils.RabbitResourceSynchronization.afterCompletion()
that would have reset the synchronized state so that
releaseResouces() would "close" the chanel/connection (return
them to the caching factory).

Being commented out, the channel was never removed, and each
transactional template call grabbed a new channel.

However, uncommenting causes issues with the listener container
because it continues to use the channel, so it must not
be closed (made available for reuse).

Added code to reset synch, by default, but not for the listener.

The listener sets a boolean releaseAfterCompletion in the
ResourceHolder to false so that the channel remains and
is not closed (logically or otherwise).
This commit is contained in:
Gary Russell
2012-07-13 18:53:06 -04:00
committed by Oleg Zhurakousky
parent 22c0a59c7f
commit 3573f9a598
4 changed files with 126 additions and 30 deletions

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -26,13 +26,14 @@ import com.rabbitmq.client.Channel;
/**
* Helper class for managing a Spring based Rabbit {@link org.springframework.amqp.rabbit.connection.ConnectionFactory},
* in particular for obtaining transactional Rabbit resources for a given ConnectionFactory.
*
*
* <p>
* Mainly for internal use within the framework. Used by {@link org.springframework.amqp.rabbit.core.RabbitTemplate} as
* well as {@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer}.
*
*
* @author Mark Fisher
* @author Dave Syer
* @author Gary Russell
*/
public class ConnectionFactoryUtils {
@@ -161,7 +162,7 @@ public class ConnectionFactoryUtils {
}
/**
*
*
*/
public static void registerDeliveryTag(ConnectionFactory connectionFactory, Channel channel, Long tag)
throws IOException {
@@ -237,10 +238,12 @@ public class ConnectionFactoryUtils {
this.transacted = transacted;
}
@Override
protected boolean shouldReleaseBeforeCompletion() {
return !this.transacted;
}
@Override
protected void processResourceAfterCommit(RabbitResourceHolder resourceHolder) {
resourceHolder.commitAll();
}
@@ -250,10 +253,13 @@ public class ConnectionFactoryUtils {
if (status != TransactionSynchronization.STATUS_COMMITTED) {
resourceHolder.rollbackAll();
}
// resourceHolder.setSynchronizedWithTransaction(false);
if (resourceHolder.isReleaseAfterCompletion()) {
resourceHolder.setSynchronizedWithTransaction(false);
}
super.afterCompletion(status);
}
@Override
protected void releaseResource(RabbitResourceHolder resourceHolder, Object resourceKey) {
ConnectionFactoryUtils.releaseResources(resourceHolder);
}

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -35,13 +35,14 @@ import com.rabbitmq.client.Channel;
/**
* Rabbit resource holder, wrapping a RabbitMQ Connection and Channel. RabbitTransactionManager binds instances of this
* class to the thread, for a given Rabbit ConnectionFactory.
*
*
* <p>
* Note: This is an SPI class, not intended to be used by applications.
*
*
* @author Mark Fisher
* @author Dave Syer
*
* @author Gary Russell
*
* @see RabbitTransactionManager
* @see RabbitTemplate
*/
@@ -61,6 +62,8 @@ public class RabbitResourceHolder extends ResourceHolderSupport {
private boolean transactional;
private boolean releaseAfterCompletion = true;
/**
* Create a new RabbitResourceHolder that is open for resources to be added.
*/
@@ -70,15 +73,25 @@ public class RabbitResourceHolder extends ResourceHolderSupport {
/**
* @param channel a channel to add
*/
public RabbitResourceHolder(Channel channel) {
public RabbitResourceHolder(Channel channel, boolean releaseAfterCompletion) {
this();
addChannel(channel);
this.releaseAfterCompletion = releaseAfterCompletion;
}
public final boolean isFrozen() {
return this.frozen;
}
/**
* Whether the resources should be released after transaction completion.
* Default true. Listener containers set to false because the listener continues
* to use the channel.
*/
public boolean isReleaseAfterCompletion() {
return releaseAfterCompletion;
}
public final void addConnection(Connection connection) {
Assert.isTrue(!this.frozen, "Cannot add Connection because RabbitResourceHolder is frozen");
Assert.notNull(connection, "Connection must not be null");

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2002-2011 the original author or authors.
*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -52,6 +52,7 @@ import com.rabbitmq.client.Channel;
* @author Mark Pollack
* @author Mark Fisher
* @author Dave Syer
* @author Gary Russell
* @since 1.0
*/
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
@@ -117,7 +118,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
/**
* Create a listener container from the connection factory (mandatory).
*
*
* @param connectionFactory the {@link ConnectionFactory}
*/
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
@@ -134,7 +135,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
* separate advice is created for the transaction and applied first in the chain. In that case the advice chain
* provided here should not contain a transaction interceptor (otherwise two transactions would be be applied).
* </p>
*
*
* @param adviceChain the advice chain to set
*/
public void setAdviceChain(Advice[] adviceChain) {
@@ -170,7 +171,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
* closed. If any workers are active when the shutdown signal comes they will be allowed to finish processing as
* long as they can finish within this timeout. Otherwise the connection is closed and messages remain unacked (if
* the channel is transactional). Defaults to 5 seconds.
*
*
* @param shutdownTimeout the shutdown timeout to set
*/
public void setShutdownTimeout(long shutdownTimeout) {
@@ -185,7 +186,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
/**
* Tells the broker how many messages to send to each consumer in a single request. Often this can be set quite high
* to improve throughput. It should be greater than or equal to {@link #setTxSize(int) the transaction size}.
*
*
* @param prefetchCount the prefetch count
*/
public void setPrefetchCount(int prefetchCount) {
@@ -195,7 +196,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
/**
* Tells the container how many messages to process in a single transaction (if the channel is transactional). For
* best results it should be less than or equal to {@link #setPrefetchCount(int) the prefetch count}.
*
*
* @param txSize the transaction size
*/
public void setTxSize(int txSize) {
@@ -287,9 +288,10 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
/**
* Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated
* MessageConsumer.
*
*
* @throws Exception
*/
@Override
protected void doInitialize() throws Exception {
initializeProxy();
}
@@ -302,9 +304,10 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
/**
* Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer
* to this container's task executor.
*
*
* @throws Exception
*/
@Override
protected void doStart() throws Exception {
super.doStart();
synchronized (this.consumersMonitor) {
@@ -336,6 +339,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
}
}
@Override
protected void doStop() {
shutdown();
super.doStop();
@@ -383,6 +387,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
return count;
}
@Override
protected boolean isChannelLocallyTransacted(Channel channel) {
return super.isChannelLocallyTransacted(channel) && this.transactionManager == null;
}
@@ -429,7 +434,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
.execute(new TransactionCallback<Boolean>() {
public Boolean doInTransaction(TransactionStatus status) {
ConnectionFactoryUtils.bindResourceToTransaction(
new RabbitResourceHolder(consumer.getChannel()), getConnectionFactory(), true);
new RabbitResourceHolder(consumer.getChannel(), false), getConnectionFactory(), true);
try {
return doReceiveAndExecute(consumer);
} catch (RuntimeException e) {
@@ -494,7 +499,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
/**
* Retrieve the fatal startup exception if this processor completely failed to locate the broker resources it
* needed. Blocks up to 60 seconds waiting (but should always return promptly in normal circumstances).
*
*
* @return a startup exception if there was one
* @throws TimeoutException if the consumer hasn't started
* @throws InterruptedException if the consumer startup is interrupted
@@ -590,7 +595,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
/**
* Wait for a period determined by the {@link #setRecoveryInterval(long) recoveryInterval} to give the container a
* chance to recover from consumer startup failure, e.g. if the broker is down.
*
*
* @param t the exception that stopped the startup
* @throws Exception if the shared connection still can't be established
*/

View File

@@ -17,12 +17,30 @@ package org.springframework.amqp.rabbit.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.ExecutorService;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author Gary Russell
@@ -31,6 +49,60 @@ import org.springframework.amqp.utils.SerializationUtils;
*/
public class RabbitTemplateTests {
@Test
public void returnConnectionAfterCommit() throws Exception {
@SuppressWarnings("serial")
TransactionTemplate txTemplate = new TransactionTemplate(new AbstractPlatformTransactionManager() {
@Override
protected Object doGetTransaction() throws TransactionException {
return new Object();
}
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
}
@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
}
@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
}
});
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
when(mockConnection.createChannel()).thenReturn(mockChannel);
when(mockChannel.isOpen()).thenReturn(true);
final RabbitTemplate template = new RabbitTemplate(new CachingConnectionFactory(mockConnectionFactory));
template.setChannelTransacted(true);
txTemplate.execute(new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
template.convertAndSend("foo", "bar");
return null;
}
});
txTemplate.execute(new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
template.convertAndSend("baz", "qux");
return null;
}
});
verify(mockConnectionFactory, Mockito.times(1)).newConnection(Mockito.any(ExecutorService.class));
// ensure we used the same channel
verify(mockConnection, Mockito.times(1)).createChannel();
}
@Test
public void testConvertbytes() {
RabbitTemplate template = new RabbitTemplate();