From ddef57af632df8b94320e44f041df91af5ca5f2c Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 11 Sep 2019 16:05:58 +0200 Subject: [PATCH] #184 - Restore AutoCommit and IsolationLevel after transaction. --- pom.xml | 2 +- .../R2dbcTransactionManager.java | 154 +++++++++++------- .../R2dbcTransactionManagerUnitTests.java | 73 ++++++++- ...ncMySqlDatabaseClientIntegrationTests.java | 1 + ...ctionalDatabaseClientIntegrationTests.java | 2 + 5 files changed, 166 insertions(+), 66 deletions(-) diff --git a/pom.xml b/pom.xml index a55a313..01bca26 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ 42.2.5 5.1.47 1.0.6 - 0.2.0.M2 + master-SNAPSHOT 7.1.2.jre8-preview Arabba-BUILD-SNAPSHOT 1.0.1 diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManager.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManager.java index 3d95728..7a48a08 100644 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManager.java +++ b/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManager.java @@ -227,55 +227,36 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager return connection.flatMap(con -> { - return prepareTransactionalConnection(con, definition).then(doBegin(con, definition)).then().doOnSuccess(v -> { - txObject.getConnectionHolder().setTransactionActive(true); + return prepareTransactionalConnection(con, definition, transaction).then(Mono.from(con.beginTransaction())) + .doOnSuccess(v -> { + txObject.getConnectionHolder().setTransactionActive(true); - Duration timeout = determineTimeout(definition); - if (!timeout.isNegative() && !timeout.isZero()) { - txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis()); - } + Duration timeout = determineTimeout(definition); + if (!timeout.isNegative() && !timeout.isZero()) { + txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis()); + } - // Bind the connection holder to the thread. - if (txObject.isNewConnectionHolder()) { - synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder()); - } - }).thenReturn(con).onErrorResume(e -> { + // Bind the connection holder to the thread. + if (txObject.isNewConnectionHolder()) { + synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder()); + } + }).thenReturn(con).onErrorResume(e -> { - CannotCreateTransactionException ex = new CannotCreateTransactionException( - "Could not open R2DBC Connection for transaction", e); + CannotCreateTransactionException ex = new CannotCreateTransactionException( + "Could not open R2DBC Connection for transaction", e); - if (txObject.isNewConnectionHolder()) { - return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()).doOnTerminate(() -> { + if (txObject.isNewConnectionHolder()) { + return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()).doOnTerminate(() -> { - txObject.setConnectionHolder(null, false); - }).then(Mono.error(ex)); - } - return Mono.error(ex); - }); + txObject.setConnectionHolder(null, false); + }).then(Mono.error(ex)); + } + return Mono.error(ex); + }); }); }).then(); } - private Mono doBegin(Connection con, TransactionDefinition definition) { - - Mono doBegin = Mono.from(con.beginTransaction()); - - if (definition != null && definition.getIsolationLevel() != -1) { - - IsolationLevel isolationLevel = resolveIsolationLevel(definition.getIsolationLevel()); - - if (isolationLevel != null) { - if (this.logger.isDebugEnabled()) { - this.logger - .debug("Changing isolation level of R2DBC Connection [" + con + "] to " + definition.getIsolationLevel()); - } - doBegin = doBegin.then(Mono.from(con.setTransactionIsolationLevel(isolationLevel))); - } - } - - return doBegin; - } - /** * Determine the actual timeout to use for the given definition. Will fall back to this manager's default timeout if * the transaction definition doesn't specify a non-default value. @@ -401,18 +382,32 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager // Reset connection. Connection con = txObject.getConnectionHolder().getConnection(); - try { - if (txObject.isNewConnectionHolder()) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Releasing R2DBC Connection [" + con + "] after transaction"); - } - return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()); - } - } finally { - txObject.getConnectionHolder().clear(); + Mono afterCleanup = Mono.empty(); + + if (txObject.isMustRestoreAutoCommit()) { + afterCleanup = afterCleanup.then(Mono.from(con.setAutoCommit(true))); } - return Mono.empty(); + if (txObject.getPreviousIsolationLevel() != null) { + afterCleanup = afterCleanup + .then(Mono.from(con.setTransactionIsolationLevel(txObject.getPreviousIsolationLevel()))); + } + + return afterCleanup.then(Mono.defer(() -> { + + try { + if (txObject.isNewConnectionHolder()) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Releasing R2DBC Connection [" + con + "] after transaction"); + } + return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()); + } + } finally { + txObject.getConnectionHolder().clear(); + } + + return Mono.empty(); + })); }); } @@ -427,18 +422,51 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager * * @param con the transactional R2DBC Connection * @param definition the current transaction definition + * @param definition the transaction object * @see #setEnforceReadOnly */ - protected Mono prepareTransactionalConnection(Connection con, TransactionDefinition definition) { + protected Mono prepareTransactionalConnection(Connection con, TransactionDefinition definition, + Object transaction) { + + ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction; + + Mono prepare = Mono.empty(); if (isEnforceReadOnly() && definition.isReadOnly()) { - return Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute()) // + prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute()) // .flatMapMany(Result::getRowsUpdated) // .then(); } - return Mono.empty(); + // Apply specific isolation level, if any. + IsolationLevel isolationLevelToUse = resolveIsolationLevel(definition.getIsolationLevel()); + if (isolationLevelToUse != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { + + if (this.logger.isDebugEnabled()) { + this.logger + .debug("Changing isolation level of R2DBC Connection [" + con + "] to " + isolationLevelToUse.asSql()); + } + IsolationLevel currentIsolation = con.getTransactionIsolationLevel(); + if (!currentIsolation.asSql().equalsIgnoreCase(isolationLevelToUse.asSql())) { + + txObject.setPreviousIsolationLevel(currentIsolation); + prepare = prepare.then(Mono.from(con.setTransactionIsolationLevel(isolationLevelToUse))); + } + } + + // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, + // so we don't want to do it unnecessarily (for example if we've explicitly + // configured the connection pool to set it already). + if (con.isAutoCommit()) { + txObject.setMustRestoreAutoCommit(true); + if (this.logger.isDebugEnabled()) { + this.logger.debug("Switching R2DBC Connection [" + con + "] to manual commit"); + } + prepare = prepare.then(Mono.from(con.setAutoCommit(false))); + } + + return prepare; } /** @@ -474,8 +502,14 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager */ private static class ConnectionFactoryTransactionObject { + private @Nullable ConnectionHolder connectionHolder; + + private @Nullable IsolationLevel previousIsolationLevel; + private boolean newConnectionHolder; + private boolean mustRestoreAutoCommit; + void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) { setConnectionHolder(connectionHolder); this.newConnectionHolder = newConnectionHolder; @@ -489,12 +523,6 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager getConnectionHolder().setRollbackOnly(); } - @Nullable private ConnectionHolder connectionHolder; - - @Nullable private IsolationLevel previousIsolationLevel; - - private boolean savepointAllowed = false; - public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) { this.connectionHolder = connectionHolder; } @@ -517,12 +545,12 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager return this.previousIsolationLevel; } - public void setSavepointAllowed(boolean savepointAllowed) { - this.savepointAllowed = savepointAllowed; + public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) { + this.mustRestoreAutoCommit = mustRestoreAutoCommit; } - public boolean isSavepointAllowed() { - return this.savepointAllowed; + public boolean isMustRestoreAutoCommit() { + return this.mustRestoreAutoCommit; } } } diff --git a/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java b/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java index 465fde3..3191606 100644 --- a/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java +++ b/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java @@ -26,7 +26,6 @@ import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.Statement; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import reactor.util.function.Tuple2; import java.util.concurrent.atomic.AtomicInteger; @@ -83,6 +82,7 @@ public class R2dbcTransactionManagerUnitTests { .verifyComplete(); assertThat(commits).hasValue(1); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(); verify(connectionMock).commitTransaction(); verify(connectionMock).close(); @@ -98,6 +98,7 @@ public class R2dbcTransactionManagerUnitTests { public void appliesIsolationLevel() { when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); + when(connectionMock.getTransactionIsolationLevel()).thenReturn(IsolationLevel.READ_COMMITTED); when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty()); DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); @@ -111,10 +112,74 @@ public class R2dbcTransactionManagerUnitTests { .verifyComplete(); verify(connectionMock).beginTransaction(); + verify(connectionMock).setTransactionIsolationLevel(IsolationLevel.READ_COMMITTED); verify(connectionMock).setTransactionIsolationLevel(IsolationLevel.SERIALIZABLE); verify(connectionMock).commitTransaction(); verify(connectionMock).close(); - verifyNoMoreInteractions(connectionMock); + } + + @Test // gh-184 + public void doesNotSetIsolationLevelIfMatch() { + + when(connectionMock.getTransactionIsolationLevel()).thenReturn(IsolationLevel.READ_COMMITTED); + when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); + + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + + ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(operator::transactional) // + .as(StepVerifier::create) // + .expectNextCount(1) // + .verifyComplete(); + + verify(connectionMock).beginTransaction(); + verify(connectionMock, never()).setTransactionIsolationLevel(any()); + verify(connectionMock).commitTransaction(); + } + + @Test // gh-184 + public void doesNotSetAutoCommitDisabled() { + + when(connectionMock.isAutoCommit()).thenReturn(false); + when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); + + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + + ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(operator::transactional) // + .as(StepVerifier::create) // + .expectNextCount(1) // + .verifyComplete(); + + verify(connectionMock).beginTransaction(); + verify(connectionMock, never()).setAutoCommit(anyBoolean()); + verify(connectionMock).commitTransaction(); + } + + @Test // gh-184 + public void restoresAutoCommit() { + + when(connectionMock.isAutoCommit()).thenReturn(true); + when(connectionMock.setAutoCommit(anyBoolean())).thenReturn(Mono.empty()); + when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); + + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + + ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(operator::transactional) // + .as(StepVerifier::create) // + .expectNextCount(1) // + .verifyComplete(); + + verify(connectionMock).beginTransaction(); + verify(connectionMock).setAutoCommit(false); + verify(connectionMock).setAutoCommit(true); + verify(connectionMock).commitTransaction(); + verify(connectionMock).close(); } @Test // gh-107 @@ -137,6 +202,7 @@ public class R2dbcTransactionManagerUnitTests { .expectNextCount(1) // .verifyComplete(); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(); verify(connectionMock).createStatement("SET TRANSACTION READ ONLY"); verify(connectionMock).commitTransaction(); @@ -161,6 +227,7 @@ public class R2dbcTransactionManagerUnitTests { .as(StepVerifier::create) // .verifyError(); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(); verify(connectionMock).createStatement("foo"); verify(connectionMock).commitTransaction(); @@ -189,6 +256,7 @@ public class R2dbcTransactionManagerUnitTests { assertThat(commits).hasValue(0); assertThat(rollbacks).hasValue(1); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(); verify(connectionMock).rollbackTransaction(); verify(connectionMock).close(); @@ -218,6 +286,7 @@ public class R2dbcTransactionManagerUnitTests { }).as(StepVerifier::create) // .verifyComplete(); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(); verify(connectionMock).rollbackTransaction(); verify(connectionMock).close(); diff --git a/src/test/java/org/springframework/data/r2dbc/core/JasyncMySqlDatabaseClientIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/core/JasyncMySqlDatabaseClientIntegrationTests.java index 65b062c..6a9696a 100644 --- a/src/test/java/org/springframework/data/r2dbc/core/JasyncMySqlDatabaseClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/core/JasyncMySqlDatabaseClientIntegrationTests.java @@ -31,6 +31,7 @@ import org.springframework.data.r2dbc.testing.MySqlTestSupport; * * @author Mark Paluch */ +@Ignore("https://github.com/jasync-sql/jasync-sql/issues/150") public class JasyncMySqlDatabaseClientIntegrationTests extends AbstractDatabaseClientIntegrationTests { @ClassRule public static final ExternalDatabase database = MySqlTestSupport.database(); diff --git a/src/test/java/org/springframework/data/r2dbc/core/JasyncMySqlTransactionalDatabaseClientIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/core/JasyncMySqlTransactionalDatabaseClientIntegrationTests.java index 68ad4ed..c5230ba 100644 --- a/src/test/java/org/springframework/data/r2dbc/core/JasyncMySqlTransactionalDatabaseClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/core/JasyncMySqlTransactionalDatabaseClientIntegrationTests.java @@ -23,6 +23,7 @@ import java.time.Duration; import javax.sql.DataSource; import org.junit.ClassRule; +import org.junit.Ignore; import org.springframework.data.r2dbc.testing.ExternalDatabase; import org.springframework.data.r2dbc.testing.MySqlTestSupport; @@ -32,6 +33,7 @@ import org.springframework.data.r2dbc.testing.MySqlTestSupport; * * @author Mark Paluch */ +@Ignore("https://github.com/jasync-sql/jasync-sql/issues/150") public class JasyncMySqlTransactionalDatabaseClientIntegrationTests extends AbstractTransactionalDatabaseClientIntegrationTests {