AMQP-190: prevent channel leak in RabbitTemplate with external transaction

This commit is contained in:
Dave Syer
2011-09-11 12:05:54 -04:00
parent af5380a2b5
commit 7538a6072b
2 changed files with 45 additions and 0 deletions

View File

@@ -250,6 +250,7 @@ public class ConnectionFactoryUtils {
if (status != TransactionSynchronization.STATUS_COMMITTED) {
resourceHolder.rollbackAll();
}
// resourceHolder.setSynchronizedWithTransaction(false);
super.afterCompletion(status);
}

View File

@@ -13,6 +13,13 @@ import org.springframework.amqp.rabbit.test.BrokerTestUtils;
import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster;
import org.springframework.amqp.rabbit.test.RepeatProcessor;
import org.springframework.test.annotation.Repeat;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
public class RabbitTemplatePerformanceIntegrationTests {
@@ -80,4 +87,41 @@ public class RabbitTemplatePerformanceIntegrationTests {
assertEquals("message", result);
}
@Test
@Repeat(2000)
public void testSendAndReceiveExternalTransacted() throws Exception {
template.setChannelTransacted(true);
new TransactionTemplate(new TestTransactionManager()).execute(new TransactionCallback<Void>() {
public Void doInTransaction(TransactionStatus status) {
template.convertAndSend(ROUTE, "message");
return null;
}
});
template.convertAndSend(ROUTE, "message");
String result = (String) template.receiveAndConvert(ROUTE);
assertEquals("message", result);
}
@SuppressWarnings("serial")
private class TestTransactionManager extends AbstractPlatformTransactionManager {
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
}
@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
}
@Override
protected Object doGetTransaction() throws TransactionException {
return new Object();
}
@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
}
}
}