#184 - Restore AutoCommit and IsolationLevel after transaction.

This commit is contained in:
Mark Paluch
2019-09-11 16:05:58 +02:00
parent 62656668ea
commit ddef57af63
5 changed files with 166 additions and 66 deletions

View File

@@ -32,7 +32,7 @@
<postgresql.version>42.2.5</postgresql.version>
<mysql.version>5.1.47</mysql.version>
<jasync.version>1.0.6</jasync.version>
<r2dbc-mysql.version>0.2.0.M2</r2dbc-mysql.version>
<r2dbc-mysql.version>master-SNAPSHOT</r2dbc-mysql.version>
<mssql-jdbc.version>7.1.2.jre8-preview</mssql-jdbc.version>
<r2dbc-releasetrain.version>Arabba-BUILD-SNAPSHOT</r2dbc-releasetrain.version>
<reactive-streams.version>1.0.1</reactive-streams.version>

View File

@@ -227,55 +227,36 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return connection.flatMap(con -> {
return prepareTransactionalConnection(con, definition).then(doBegin(con, definition)).then().doOnSuccess(v -> {
txObject.getConnectionHolder().setTransactionActive(true);
return prepareTransactionalConnection(con, definition, transaction).then(Mono.from(con.beginTransaction()))
.doOnSuccess(v -> {
txObject.getConnectionHolder().setTransactionActive(true);
Duration timeout = determineTimeout(definition);
if (!timeout.isNegative() && !timeout.isZero()) {
txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
}
Duration timeout = determineTimeout(definition);
if (!timeout.isNegative() && !timeout.isZero()) {
txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
}
}).thenReturn(con).onErrorResume(e -> {
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
}
}).thenReturn(con).onErrorResume(e -> {
CannotCreateTransactionException ex = new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", e);
CannotCreateTransactionException ex = new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", e);
if (txObject.isNewConnectionHolder()) {
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()).doOnTerminate(() -> {
if (txObject.isNewConnectionHolder()) {
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()).doOnTerminate(() -> {
txObject.setConnectionHolder(null, false);
}).then(Mono.error(ex));
}
return Mono.error(ex);
});
txObject.setConnectionHolder(null, false);
}).then(Mono.error(ex));
}
return Mono.error(ex);
});
});
}).then();
}
private Mono<Void> doBegin(Connection con, TransactionDefinition definition) {
Mono<Void> doBegin = Mono.from(con.beginTransaction());
if (definition != null && definition.getIsolationLevel() != -1) {
IsolationLevel isolationLevel = resolveIsolationLevel(definition.getIsolationLevel());
if (isolationLevel != null) {
if (this.logger.isDebugEnabled()) {
this.logger
.debug("Changing isolation level of R2DBC Connection [" + con + "] to " + definition.getIsolationLevel());
}
doBegin = doBegin.then(Mono.from(con.setTransactionIsolationLevel(isolationLevel)));
}
}
return doBegin;
}
/**
* Determine the actual timeout to use for the given definition. Will fall back to this manager's default timeout if
* the transaction definition doesn't specify a non-default value.
@@ -401,18 +382,32 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isNewConnectionHolder()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Releasing R2DBC Connection [" + con + "] after transaction");
}
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory());
}
} finally {
txObject.getConnectionHolder().clear();
Mono<Void> afterCleanup = Mono.empty();
if (txObject.isMustRestoreAutoCommit()) {
afterCleanup = afterCleanup.then(Mono.from(con.setAutoCommit(true)));
}
return Mono.empty();
if (txObject.getPreviousIsolationLevel() != null) {
afterCleanup = afterCleanup
.then(Mono.from(con.setTransactionIsolationLevel(txObject.getPreviousIsolationLevel())));
}
return afterCleanup.then(Mono.defer(() -> {
try {
if (txObject.isNewConnectionHolder()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Releasing R2DBC Connection [" + con + "] after transaction");
}
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory());
}
} finally {
txObject.getConnectionHolder().clear();
}
return Mono.empty();
}));
});
}
@@ -427,18 +422,51 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
*
* @param con the transactional R2DBC Connection
* @param definition the current transaction definition
* @param definition the transaction object
* @see #setEnforceReadOnly
*/
protected Mono<Void> prepareTransactionalConnection(Connection con, TransactionDefinition definition) {
protected Mono<Void> prepareTransactionalConnection(Connection con, TransactionDefinition definition,
Object transaction) {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
Mono<Void> prepare = Mono.empty();
if (isEnforceReadOnly() && definition.isReadOnly()) {
return Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute()) //
prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute()) //
.flatMapMany(Result::getRowsUpdated) //
.then();
}
return Mono.empty();
// Apply specific isolation level, if any.
IsolationLevel isolationLevelToUse = resolveIsolationLevel(definition.getIsolationLevel());
if (isolationLevelToUse != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
if (this.logger.isDebugEnabled()) {
this.logger
.debug("Changing isolation level of R2DBC Connection [" + con + "] to " + isolationLevelToUse.asSql());
}
IsolationLevel currentIsolation = con.getTransactionIsolationLevel();
if (!currentIsolation.asSql().equalsIgnoreCase(isolationLevelToUse.asSql())) {
txObject.setPreviousIsolationLevel(currentIsolation);
prepare = prepare.then(Mono.from(con.setTransactionIsolationLevel(isolationLevelToUse)));
}
}
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.isAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching R2DBC Connection [" + con + "] to manual commit");
}
prepare = prepare.then(Mono.from(con.setAutoCommit(false)));
}
return prepare;
}
/**
@@ -474,8 +502,14 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
*/
private static class ConnectionFactoryTransactionObject {
private @Nullable ConnectionHolder connectionHolder;
private @Nullable IsolationLevel previousIsolationLevel;
private boolean newConnectionHolder;
private boolean mustRestoreAutoCommit;
void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
@@ -489,12 +523,6 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
getConnectionHolder().setRollbackOnly();
}
@Nullable private ConnectionHolder connectionHolder;
@Nullable private IsolationLevel previousIsolationLevel;
private boolean savepointAllowed = false;
public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) {
this.connectionHolder = connectionHolder;
}
@@ -517,12 +545,12 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return this.previousIsolationLevel;
}
public void setSavepointAllowed(boolean savepointAllowed) {
this.savepointAllowed = savepointAllowed;
public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {
this.mustRestoreAutoCommit = mustRestoreAutoCommit;
}
public boolean isSavepointAllowed() {
return this.savepointAllowed;
public boolean isMustRestoreAutoCommit() {
return this.mustRestoreAutoCommit;
}
}
}

View File

@@ -26,7 +26,6 @@ import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Statement;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;
import java.util.concurrent.atomic.AtomicInteger;
@@ -83,6 +82,7 @@ public class R2dbcTransactionManagerUnitTests {
.verifyComplete();
assertThat(commits).hasValue(1);
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction();
verify(connectionMock).commitTransaction();
verify(connectionMock).close();
@@ -98,6 +98,7 @@ public class R2dbcTransactionManagerUnitTests {
public void appliesIsolationLevel() {
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
when(connectionMock.getTransactionIsolationLevel()).thenReturn(IsolationLevel.READ_COMMITTED);
when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty());
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
@@ -111,10 +112,74 @@ public class R2dbcTransactionManagerUnitTests {
.verifyComplete();
verify(connectionMock).beginTransaction();
verify(connectionMock).setTransactionIsolationLevel(IsolationLevel.READ_COMMITTED);
verify(connectionMock).setTransactionIsolationLevel(IsolationLevel.SERIALIZABLE);
verify(connectionMock).commitTransaction();
verify(connectionMock).close();
verifyNoMoreInteractions(connectionMock);
}
@Test // gh-184
public void doesNotSetIsolationLevelIfMatch() {
when(connectionMock.getTransactionIsolationLevel()).thenReturn(IsolationLevel.READ_COMMITTED);
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(operator::transactional) //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
verify(connectionMock).beginTransaction();
verify(connectionMock, never()).setTransactionIsolationLevel(any());
verify(connectionMock).commitTransaction();
}
@Test // gh-184
public void doesNotSetAutoCommitDisabled() {
when(connectionMock.isAutoCommit()).thenReturn(false);
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(operator::transactional) //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
verify(connectionMock).beginTransaction();
verify(connectionMock, never()).setAutoCommit(anyBoolean());
verify(connectionMock).commitTransaction();
}
@Test // gh-184
public void restoresAutoCommit() {
when(connectionMock.isAutoCommit()).thenReturn(true);
when(connectionMock.setAutoCommit(anyBoolean())).thenReturn(Mono.empty());
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(operator::transactional) //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
verify(connectionMock).beginTransaction();
verify(connectionMock).setAutoCommit(false);
verify(connectionMock).setAutoCommit(true);
verify(connectionMock).commitTransaction();
verify(connectionMock).close();
}
@Test // gh-107
@@ -137,6 +202,7 @@ public class R2dbcTransactionManagerUnitTests {
.expectNextCount(1) //
.verifyComplete();
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction();
verify(connectionMock).createStatement("SET TRANSACTION READ ONLY");
verify(connectionMock).commitTransaction();
@@ -161,6 +227,7 @@ public class R2dbcTransactionManagerUnitTests {
.as(StepVerifier::create) //
.verifyError();
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction();
verify(connectionMock).createStatement("foo");
verify(connectionMock).commitTransaction();
@@ -189,6 +256,7 @@ public class R2dbcTransactionManagerUnitTests {
assertThat(commits).hasValue(0);
assertThat(rollbacks).hasValue(1);
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction();
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();
@@ -218,6 +286,7 @@ public class R2dbcTransactionManagerUnitTests {
}).as(StepVerifier::create) //
.verifyComplete();
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction();
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();

View File

@@ -31,6 +31,7 @@ import org.springframework.data.r2dbc.testing.MySqlTestSupport;
*
* @author Mark Paluch
*/
@Ignore("https://github.com/jasync-sql/jasync-sql/issues/150")
public class JasyncMySqlDatabaseClientIntegrationTests extends AbstractDatabaseClientIntegrationTests {
@ClassRule public static final ExternalDatabase database = MySqlTestSupport.database();

View File

@@ -23,6 +23,7 @@ import java.time.Duration;
import javax.sql.DataSource;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.springframework.data.r2dbc.testing.ExternalDatabase;
import org.springframework.data.r2dbc.testing.MySqlTestSupport;
@@ -32,6 +33,7 @@ import org.springframework.data.r2dbc.testing.MySqlTestSupport;
*
* @author Mark Paluch
*/
@Ignore("https://github.com/jasync-sql/jasync-sql/issues/150")
public class JasyncMySqlTransactionalDatabaseClientIntegrationTests
extends AbstractTransactionalDatabaseClientIntegrationTests {