From 3c3277eac40882fa444d0b7f6d0aba4d0a661ffa Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 15 Jul 2019 13:39:23 +0200 Subject: [PATCH] =?UTF-8?q?#124=20-=20Remove=20deprecated=20DatabaseClient?= =?UTF-8?q?.execute().sql(=E2=80=A6)=20and=20TransactionalDatabaseClient.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/asciidoc/reference/r2dbc-core.adoc | 2 +- src/main/asciidoc/reference/r2dbc-sql.adoc | 20 +- .../reference/r2dbc-transactions.adoc | 8 +- .../ConnectionFactoryUtils.java | 150 ++---------- .../DefaultTransactionResources.java | 53 ----- .../R2dbcTransactionManager.java | 43 +++- .../R2dbcTransactionObjectSupport.java | 66 ------ .../ReactiveTransactionSynchronization.java | 87 ------- .../SingletonConnectionFactory.java | 91 -------- ...ransactionAwareConnectionFactoryProxy.java | 24 +- .../TransactionResources.java | 61 ----- .../init/DatabasePopulatorUtils.java | 3 +- .../data/r2dbc/core/DatabaseClient.java | 50 +--- .../r2dbc/core/DefaultDatabaseClient.java | 30 +-- .../DefaultTransactionalDatabaseClient.java | 150 ------------ ...ultTransactionalDatabaseClientBuilder.java | 105 --------- .../core/TransactionalDatabaseClient.java | 217 ------------------ .../data/r2dbc/config/H2IntegrationTests.java | 2 +- .../ConnectionFactoryUtilsUnitTests.java | 117 ---------- .../R2dbcTransactionManagerUnitTests.java | 6 +- ...nAwareConnectionFactoryProxyUnitTests.java | 70 +++--- ...bstractDatabaseClientIntegrationTests.java | 4 +- ...ctionalDatabaseClientIntegrationTests.java | 119 +++------- ...ctionalDatabaseClientIntegrationTests.java | 13 +- ...ctionalDatabaseClientIntegrationTests.java | 4 +- ...ctionalDatabaseClientIntegrationTests.java | 4 +- ...stractR2dbcRepositoryIntegrationTests.java | 23 +- .../H2R2dbcRepositoryIntegrationTests.java | 2 + .../MySqlR2dbcRepositoryIntegrationTests.java | 2 + ...stgresR2dbcRepositoryIntegrationTests.java | 2 + ...ServerR2dbcRepositoryIntegrationTests.java | 2 + 31 files changed, 177 insertions(+), 1353 deletions(-) delete mode 100644 src/main/java/org/springframework/data/r2dbc/connectionfactory/DefaultTransactionResources.java delete mode 100644 src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionObjectSupport.java delete mode 100644 src/main/java/org/springframework/data/r2dbc/connectionfactory/ReactiveTransactionSynchronization.java delete mode 100644 src/main/java/org/springframework/data/r2dbc/connectionfactory/SingletonConnectionFactory.java delete mode 100644 src/main/java/org/springframework/data/r2dbc/connectionfactory/TransactionResources.java delete mode 100644 src/main/java/org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClient.java delete mode 100644 src/main/java/org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClientBuilder.java delete mode 100644 src/main/java/org/springframework/data/r2dbc/core/TransactionalDatabaseClient.java delete mode 100644 src/test/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtilsUnitTests.java diff --git a/src/main/asciidoc/reference/r2dbc-core.adoc b/src/main/asciidoc/reference/r2dbc-core.adoc index 318fcd5..4f60f7a 100644 --- a/src/main/asciidoc/reference/r2dbc-core.adoc +++ b/src/main/asciidoc/reference/r2dbc-core.adoc @@ -139,7 +139,7 @@ public class R2dbcApp { DatabaseClient client = DatabaseClient.create(connectionFactory); - client.sql("CREATE TABLE person" + + client.execute("CREATE TABLE person" + "(id VARCHAR(255) PRIMARY KEY," + "name VARCHAR(255)," + "age INT)") diff --git a/src/main/asciidoc/reference/r2dbc-sql.adoc b/src/main/asciidoc/reference/r2dbc-sql.adoc index d4e8fea..f5b4bdc 100644 --- a/src/main/asciidoc/reference/r2dbc-sql.adoc +++ b/src/main/asciidoc/reference/r2dbc-sql.adoc @@ -6,7 +6,7 @@ The following example shows what you need to include for minimal but fully funct [source,java] ---- -Mono completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);") +Mono completion = client.execute("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);") .then(); ---- @@ -14,7 +14,7 @@ Mono completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY It exposes intermediate, continuation, and terminal methods at each stage of the execution specification. The example above uses `then()` to return a completion `Publisher` that completes as soon as the query (or queries, if the SQL query contains multiple statements) completes. -NOTE: `sql(…)` accepts either the SQL query string or a query `Supplier` to defer the actual query creation until execution. +NOTE: `execute(…)` accepts either the SQL query string or a query `Supplier` to defer the actual query creation until execution. [[r2dbc.datbaseclient.queries]] == Running Queries @@ -26,7 +26,7 @@ The following example shows an `UPDATE` statement that returns the number of upd [source,java] ---- -Mono affectedRows = client.sql("UPDATE person SET name = 'Joe'") +Mono affectedRows = client.execute("UPDATE person SET name = 'Joe'") .fetch().rowsUpdated(); ---- @@ -36,7 +36,7 @@ You might have noticed the use of `fetch()` in the previous example. [source,java] ---- -Mono> first = client.sql("SELECT id, name FROM person") +Mono> first = client.execute("SELECT id, name FROM person") .fetch().first(); ---- @@ -52,7 +52,7 @@ You can consume data with the following operators: [source,java] ---- -Flux all = client.sql("SELECT id, name FROM mytable") +Flux all = client.execute("SELECT id, name FROM mytable") .as(Person.class) .fetch().all(); ---- @@ -69,7 +69,7 @@ The following example extracts the `id` column and emits its value: [source,java] ---- -Flux names = client.sql("SELECT name FROM person") +Flux names = client.execute("SELECT name FROM person") .map((row, rowMetadata) -> row.get("id", String.class)) .all(); ---- @@ -90,7 +90,7 @@ A typical application requires parameterized SQL statements to select or update These are typically `SELECT` statements constrained by a `WHERE` clause or `INSERT`/`UPDATE` statements accepting input parameters. Parameterized statements bear the risk of SQL injection if parameters are not escaped properly. `DatabaseClient` leverages R2DBC's Bind API to eliminate the risk of SQL injection for query parameters. -You can provide a parameterized SQL statement with the `sql(…)` operator and bind parameters to the actual `Statement`. +You can provide a parameterized SQL statement with the `execute(…)` operator and bind parameters to the actual `Statement`. Your R2DBC driver then executes the statement using prepared statements and parameter substitution. Parameter binding supports various binding strategies: @@ -102,7 +102,7 @@ The following example shows parameter binding for a query: [source,java] ---- -db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)") +db.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)") .bind("id", "joe") .bind("name", "Joe") .bind("age", 34); @@ -140,7 +140,7 @@ List tuples = new ArrayList<>(); tuples.add(new Object[] {"John", 35}); tuples.add(new Object[] {"Ann", 50}); -db.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)") +db.execute("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)") .bind("tuples", tuples); ---- @@ -150,6 +150,6 @@ A simpler variant using `IN` predicates: [source,java] ---- -db.sql("SELECT id, name, state FROM table WHERE age IN (:ages)") +db.execute("SELECT id, name, state FROM table WHERE age IN (:ages)") .bind("ages", Arrays.asList(35, 50)); ---- diff --git a/src/main/asciidoc/reference/r2dbc-transactions.adoc b/src/main/asciidoc/reference/r2dbc-transactions.adoc index 1b2087a..94ddc3c 100644 --- a/src/main/asciidoc/reference/r2dbc-transactions.adoc +++ b/src/main/asciidoc/reference/r2dbc-transactions.adoc @@ -18,12 +18,12 @@ TransactionalOperator operator = TransactionalOperator.create(tm); <1> DatabaseClient client = DatabaseClient.create(connectionFactory); -Mono atomicOperation = client.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)") +Mono atomicOperation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)") .bind("id", "joe") .bind("name", "Joe") .bind("age", 34) .fetch().rowsUpdated() - .then(client.sql("INSERT INTO contacts (id, name) VALUES(:id, :name)") + .then(client.execute("INSERT INTO contacts (id, name) VALUES(:id, :name)") .bind("id", "joe") .bind("name", "Joe") .fetch().rowsUpdated()) @@ -69,12 +69,12 @@ class MyService { @Transactional public Mono insertPerson() { - return client.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)") + return client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)") .bind("id", "joe") .bind("name", "Joe") .bind("age", 34) .fetch().rowsUpdated() - .then(client.sql("INSERT INTO contacts (id, name) VALUES(:id, :name)") + .then(client.execute("INSERT INTO contacts (id, name) VALUES(:id, :name)") .bind("id", "joe") .bind("name", "Joe") .fetch().rowsUpdated()) diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtils.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtils.java index 49f01c9..9533b52 100644 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtils.java +++ b/src/main/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtils.java @@ -23,7 +23,6 @@ import reactor.util.function.Tuples; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; import org.springframework.core.Ordered; import org.springframework.dao.DataAccessResourceFailureException; @@ -67,7 +66,7 @@ public abstract class ConnectionFactoryUtils { * @throws DataAccessResourceFailureException if the attempt to get a {@link io.r2dbc.spi.Connection} failed * @see #releaseConnection */ - public static Mono> getConnection(ConnectionFactory connectionFactory) { + public static Mono getConnection(ConnectionFactory connectionFactory) { return doGetConnection(connectionFactory) .onErrorMap(e -> new DataAccessResourceFailureException("Failed to obtain R2DBC Connection", e)); } @@ -82,7 +81,7 @@ public abstract class ConnectionFactoryUtils { * @param connectionFactory the {@link ConnectionFactory} to obtain Connections from. * @return a R2DBC {@link io.r2dbc.spi.Connection} from the given {@link ConnectionFactory}. */ - public static Mono> doGetConnection(ConnectionFactory connectionFactory) { + public static Mono doGetConnection(ConnectionFactory connectionFactory) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null!"); @@ -138,56 +137,11 @@ public abstract class ConnectionFactoryUtils { return con; }) // - .map(conn -> Tuples.of(conn, connectionFactory)) // .onErrorResume(NoTransactionException.class, e -> { - - return Mono.subscriberContext().flatMap(it -> { - - if (it.hasKey(ReactiveTransactionSynchronization.class)) { - - ReactiveTransactionSynchronization synchronization = it.get(ReactiveTransactionSynchronization.class); - - return obtainConnection(synchronization, connectionFactory); - } - return Mono.empty(); - }).switchIfEmpty(Mono.defer(() -> { - return Mono.from(connectionFactory.create()).map(it -> Tuples.of(it, connectionFactory)); - })); + return Mono.from(connectionFactory.create()); }); } - private static Mono> obtainConnection( - ReactiveTransactionSynchronization synchronization, ConnectionFactory connectionFactory) { - - if (synchronization.isSynchronizationActive()) { - - if (logger.isDebugEnabled()) { - logger.debug("Registering transaction synchronization for R2DBC Connection"); - } - - TransactionResources txContext = synchronization.getCurrentTransaction(); - ConnectionFactory resource = txContext.getResource(ConnectionFactory.class); - - Mono> attachNewConnection = Mono - .defer(() -> Mono.from(connectionFactory.create()).map(it -> { - - if (logger.isDebugEnabled()) { - logger.debug("Fetching new R2DBC Connection from ConnectionFactory"); - } - - SingletonConnectionFactory s = new SingletonConnectionFactory(connectionFactory.getMetadata(), it); - txContext.registerResource(ConnectionFactory.class, s); - - return Tuples.of(it, connectionFactory); - })); - - return Mono.justOrEmpty(resource).flatMap(ConnectionFactoryUtils::createConnection) - .switchIfEmpty(attachNewConnection); - } - - return Mono.empty(); - } - /** * Actually fetch a {@link Connection} from the given {@link ConnectionFactory}, defensively turning an unexpected * {@code null} return value from {@link ConnectionFactory#create()} into an {@link IllegalStateException}. @@ -198,12 +152,7 @@ public abstract class ConnectionFactoryUtils { * @see ConnectionFactory#create() */ private static Mono fetchConnection(ConnectionFactory connectionFactory) { - - Publisher con = connectionFactory.create(); - if (con == null) { // TODO: seriously why would it do that? - throw new IllegalStateException("ConnectionFactory returned null from getConnection(): " + connectionFactory); - } - return Mono.from(con); + return Mono.from(connectionFactory.create()); } /** @@ -226,40 +175,24 @@ public abstract class ConnectionFactoryUtils { * Actually close the given {@link io.r2dbc.spi.Connection}, obtained from the given {@link ConnectionFactory}. Same * as {@link #releaseConnection}, but preserving the original exception. * - * @param con the {@link io.r2dbc.spi.Connection} to close if necessary. + * @param connection the {@link io.r2dbc.spi.Connection} to close if necessary. * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from (may be * {@literal null}). * @see #doGetConnection */ - public static Mono doReleaseConnection(@Nullable io.r2dbc.spi.Connection con, + public static Mono doReleaseConnection(@Nullable io.r2dbc.spi.Connection connection, @Nullable ConnectionFactory connectionFactory) { return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { ConnectionHolder conHolder = (ConnectionHolder) it.getResource(connectionFactory); - if (conHolder != null && connectionEquals(conHolder, con)) { + if (conHolder != null && connectionEquals(conHolder, connection)) { // It's the transactional Connection: Don't close it. conHolder.released(); } - return Mono.from(con.close()); + return Mono.from(connection.close()); }).onErrorResume(NoTransactionException.class, e -> { - - if (connectionFactory instanceof SingletonConnectionFactory) { - - SingletonConnectionFactory factory = (SingletonConnectionFactory) connectionFactory; - - if (logger.isDebugEnabled()) { - logger.debug("Releasing R2DBC Connection"); - } - - return factory.close(con); - } - - if (logger.isDebugEnabled()) { - logger.debug("Closing R2DBC Connection"); - } - - return Mono.from(con.close()); + return doCloseConnection(connection, connectionFactory); }); } @@ -287,55 +220,23 @@ public abstract class ConnectionFactoryUtils { */ public static Mono doCloseConnection(Connection connection, @Nullable ConnectionFactory connectionFactory) { - if (!(connectionFactory instanceof SingletonConnectionFactory) - || ((SingletonConnectionFactory) connectionFactory).shouldClose(connection)) { + if (!(connectionFactory instanceof SmartConnectionFactory) + || ((SmartConnectionFactory) connectionFactory).shouldClose(connection)) { - SingletonConnectionFactory factory = (SingletonConnectionFactory) connectionFactory; - return factory.close(connection).then(Mono.from(connection.close())); + if (logger.isDebugEnabled()) { + logger.debug("Closing R2DBC Connection"); + } + + return Mono.from(connection.close()); } return Mono.empty(); } - /** - * Obtain the currently {@link ReactiveTransactionSynchronization} from the current subscriber - * {@link reactor.util.context.Context}. - * - * @throws NoTransactionException if no active {@link ReactiveTransactionSynchronization} is associated with the - * current subscription. - * @see Mono#subscriberContext() - * @see ReactiveTransactionSynchronization - */ - public static Mono currentReactiveTransactionSynchronization() { - - return Mono.subscriberContext().filter(it -> it.hasKey(ReactiveTransactionSynchronization.class)) // - .switchIfEmpty(Mono.error(new NoTransactionException( - "Transaction management is not enabled. Make sure to register ReactiveTransactionSynchronization in the subscriber Context!"))) // - .map(it -> it.get(ReactiveTransactionSynchronization.class)); - } - - /** - * Obtain the currently active {@link ReactiveTransactionSynchronization} from the current subscriber - * {@link reactor.util.context.Context}. - * - * @throws NoTransactionException if no active {@link ReactiveTransactionSynchronization} is associated with the - * current subscription. - * @see Mono#subscriberContext() - * @see ReactiveTransactionSynchronization - */ - public static Mono currentActiveReactiveTransactionSynchronization() { - - return currentReactiveTransactionSynchronization() - .filter(ReactiveTransactionSynchronization::isSynchronizationActive) // - .switchIfEmpty(Mono.error(new NoTransactionException("ReactiveTransactionSynchronization not active!"))); - } - /** * Obtain the {@link io.r2dbc.spi.ConnectionFactory} from the current subscriber {@link reactor.util.context.Context}. * - * @see Mono#subscriberContext() - * @see ReactiveTransactionSynchronization - * @see TransactionResources + * @see TransactionSynchronizationManager */ public static Mono currentConnectionFactory(ConnectionFactory connectionFactory) { @@ -347,8 +248,7 @@ public abstract class ConnectionFactoryUtils { return true; } return false; - }).map(it -> connectionFactory) // - .onErrorResume(NoTransactionException.class, ConnectionFactoryUtils::obtainDefaultConnectionFactory); + }).map(it -> connectionFactory); } /** @@ -410,20 +310,6 @@ public abstract class ConnectionFactoryUtils { return order; } - /** - * @param e - * @return an {@link Mono#error(Throwable) error} if not transaction present. - */ - private static Mono obtainDefaultConnectionFactory(NoTransactionException e) { - - return currentActiveReactiveTransactionSynchronization().map(synchronization -> { - - TransactionResources currentSynchronization = synchronization.getCurrentTransaction(); - return currentSynchronization.getResource(ConnectionFactory.class); - }).switchIfEmpty(Mono.error( - new DataAccessResourceFailureException("Cannot extract ConnectionFactory from current TransactionContext!"))); - } - /** * Create a {@link Connection} via the given {@link ConnectionFactory#create() factory} and return a {@link Tuple2} * associating the {@link Connection} with its creating {@link ConnectionFactory}. diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/DefaultTransactionResources.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/DefaultTransactionResources.java deleted file mode 100644 index 5730454..0000000 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/DefaultTransactionResources.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2018-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.connectionfactory; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.springframework.util.Assert; - -/** - * Default implementation of {@link TransactionResources}. - * - * @author Mark Paluch - */ -class DefaultTransactionResources implements TransactionResources { - - private Map, Object> items = new ConcurrentHashMap<>(); - - /* - * (non-Javadoc) - * @see org.springframework.data.r2dbc.function.connectionfactory.TransactionResources#registerResource(java.lang.Class, java.lang.Object) - */ - @Override - public void registerResource(Class key, T value) { - - Assert.state(!items.containsKey(key), () -> String.format("Resource for %s is already bound", key)); - - items.put(key, value); - } - - /* - * (non-Javadoc) - * @see org.springframework.data.r2dbc.function.connectionfactory.TransactionResources#getResource(java.lang.Class) - */ - @SuppressWarnings("unchecked") - @Override - public T getResource(Class key) { - return (T) items.get(key); - } -} diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManager.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManager.java index d32e1d2..b7d3303 100644 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManager.java +++ b/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManager.java @@ -70,8 +70,7 @@ import org.springframework.util.Assert; * @see TransactionAwareConnectionFactoryProxy * @see DatabaseClient */ -public class R2dbcTransactionManager extends AbstractReactiveTransactionManager - implements InitializingBean { +public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean { private ConnectionFactory connectionFactory; @@ -473,12 +472,12 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager * ConnectionFactory transaction object, representing a ConnectionHolder. Used as transaction object by * ConnectionFactoryTransactionManager. */ - private static class ConnectionFactoryTransactionObject extends R2dbcTransactionObjectSupport { + private static class ConnectionFactoryTransactionObject { private boolean newConnectionHolder; void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) { - super.setConnectionHolder(connectionHolder); + setConnectionHolder(connectionHolder); this.newConnectionHolder = newConnectionHolder; } @@ -489,5 +488,41 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager void setRollbackOnly() { getConnectionHolder().setRollbackOnly(); } + + @Nullable private ConnectionHolder connectionHolder; + + @Nullable private IsolationLevel previousIsolationLevel; + + private boolean savepointAllowed = false; + + public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) { + this.connectionHolder = connectionHolder; + } + + public ConnectionHolder getConnectionHolder() { + Assert.state(this.connectionHolder != null, "No ConnectionHolder available"); + return this.connectionHolder; + } + + public boolean hasConnectionHolder() { + return (this.connectionHolder != null); + } + + public void setPreviousIsolationLevel(@Nullable IsolationLevel previousIsolationLevel) { + this.previousIsolationLevel = previousIsolationLevel; + } + + @Nullable + public IsolationLevel getPreviousIsolationLevel() { + return this.previousIsolationLevel; + } + + public void setSavepointAllowed(boolean savepointAllowed) { + this.savepointAllowed = savepointAllowed; + } + + public boolean isSavepointAllowed() { + return this.savepointAllowed; + } } } diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionObjectSupport.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionObjectSupport.java deleted file mode 100644 index 9226f18..0000000 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionObjectSupport.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.connectionfactory; - -import io.r2dbc.spi.IsolationLevel; -import org.springframework.lang.Nullable; -import org.springframework.util.Assert; - -/** - * Convenient base class for R2DBC-aware transaction objects. Can contain a {@link ConnectionHolder} with a R2DBC - * {@link io.r2dbc.spi.Connection}. - * - * @author Mark Paluch - * @see R2dbcTransactionManager - */ -public abstract class R2dbcTransactionObjectSupport { - - @Nullable private ConnectionHolder connectionHolder; - - @Nullable private IsolationLevel previousIsolationLevel; - - private boolean savepointAllowed = false; - - public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) { - this.connectionHolder = connectionHolder; - } - - public ConnectionHolder getConnectionHolder() { - Assert.state(this.connectionHolder != null, "No ConnectionHolder available"); - return this.connectionHolder; - } - - public boolean hasConnectionHolder() { - return (this.connectionHolder != null); - } - - public void setPreviousIsolationLevel(@Nullable IsolationLevel previousIsolationLevel) { - this.previousIsolationLevel = previousIsolationLevel; - } - - @Nullable - public IsolationLevel getPreviousIsolationLevel() { - return this.previousIsolationLevel; - } - - public void setSavepointAllowed(boolean savepointAllowed) { - this.savepointAllowed = savepointAllowed; - } - - public boolean isSavepointAllowed() { - return this.savepointAllowed; - } -} diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/ReactiveTransactionSynchronization.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/ReactiveTransactionSynchronization.java deleted file mode 100644 index 4c8487a..0000000 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/ReactiveTransactionSynchronization.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2018-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.connectionfactory; - -import java.util.Stack; - -import org.springframework.lang.Nullable; -import org.springframework.util.Assert; - -/** - * Central delegate that manages transactional resources. To be used by resource management code but not by typical - * application code. - *

- * Supports a list of transactional resources if synchronization is active. - *

- * Resource management code should check for subscriber {@link reactor.util.context.Context}-bound resources, e.g. R2DBC - * Connections using {@link TransactionResources#getResource(Class)}. Such code is normally not supposed to bind - * resources, as this is the responsibility of transaction managers. A further option is to lazily bind on first use if - * transaction synchronization is active, for performing transactions that span an arbitrary number of resources. - *

- * Transaction synchronization must be activated and deactivated by a transaction manager by registering - * {@link ReactiveTransactionSynchronization} in the {@link reactor.util.context.Context subscriber context}. - * - * @author Mark Paluch - */ -public class ReactiveTransactionSynchronization { - - private Stack resources = new Stack<>(); - - /** - * Return if transaction synchronization is active for the current {@link reactor.util.context.Context}. Can be called - * before register to avoid unnecessary instance creation. - */ - public boolean isSynchronizationActive() { - return !resources.isEmpty(); - } - - /** - * Create a new transaction span and register a {@link TransactionResources} instance. - * - * @param transactionResources must not be {@literal null}. - */ - public void registerTransaction(TransactionResources transactionResources) { - - Assert.notNull(transactionResources, "TransactionContext must not be null!"); - - resources.push(transactionResources); - } - - /** - * Unregister a transaction span and by removing {@link TransactionResources} instance. - * - * @param transactionResources must not be {@literal null}. - */ - public void unregisterTransaction(TransactionResources transactionResources) { - - Assert.notNull(transactionResources, "TransactionContext must not be null!"); - - resources.remove(transactionResources); - } - - /** - * @return obtain the current {@link TransactionResources} or {@literal null} if none is present. - */ - @Nullable - public TransactionResources getCurrentTransaction() { - - if (!resources.isEmpty()) { - return resources.peek(); - } - - return null; - } -} diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/SingletonConnectionFactory.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/SingletonConnectionFactory.java deleted file mode 100644 index ed993ff..0000000 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/SingletonConnectionFactory.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2018-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.connectionfactory; - -import io.r2dbc.spi.Connection; -import io.r2dbc.spi.ConnectionFactoryMetadata; -import reactor.core.publisher.Mono; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.reactivestreams.Publisher; - -/** - * Connection holder, wrapping a R2DBC Connection. - * {@link org.springframework.data.r2dbc.core.TransactionalDatabaseClient} binds instances of this class to the - * {@link TransactionResources} for a specific subscription. - * - * @author Mark Paluch - */ -class SingletonConnectionFactory implements SmartConnectionFactory { - - private final ConnectionFactoryMetadata metadata; - private final Connection connection; - private final Mono connectionMono; - private final AtomicInteger refCount = new AtomicInteger(); - - SingletonConnectionFactory(ConnectionFactoryMetadata metadata, Connection connection) { - - this.metadata = metadata; - this.connection = connection; - this.connectionMono = Mono.just(connection); - } - - /* - * (non-Javadoc) - * @see io.r2dbc.spi.ConnectionFactory#create() - */ - @Override - public Publisher create() { - - if (refCount.get() == -1) { - throw new IllegalStateException("Connection is closed!"); - } - - return connectionMono.doOnSubscribe(s -> refCount.incrementAndGet()); - } - - /* - * (non-Javadoc) - * @see io.r2dbc.spi.ConnectionFactory#getMetadata() - */ - @Override - public ConnectionFactoryMetadata getMetadata() { - return metadata; - } - - private boolean connectionEquals(Connection connection) { - return this.connection == connection; - } - - /* - * (non-Javadoc) - * @see org.springframework.data.r2dbc.function.connectionfactory.SmartConnectionFactory#shouldClose(io.r2dbc.spi.Connection) - */ - @Override - public boolean shouldClose(Connection connection) { - return refCount.get() == 1; - } - - Mono close(Connection connection) { - - if (connectionEquals(connection)) { - return Mono. empty().doOnSubscribe(s -> refCount.decrementAndGet()); - } - - throw new IllegalArgumentException("Connection is not associated with this connection factory"); - } -} diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/TransactionAwareConnectionFactoryProxy.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/TransactionAwareConnectionFactoryProxy.java index ad8650e..24d95fe 100644 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/TransactionAwareConnectionFactoryProxy.java +++ b/src/main/java/org/springframework/data/r2dbc/connectionfactory/TransactionAwareConnectionFactoryProxy.java @@ -15,30 +15,27 @@ */ package org.springframework.data.r2dbc.connectionfactory; -import static org.springframework.util.ReflectionUtils.*; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Wrapped; +import reactor.core.publisher.Mono; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; -import io.r2dbc.spi.Connection; -import io.r2dbc.spi.ConnectionFactory; -import io.r2dbc.spi.Wrapped; - import org.springframework.data.r2dbc.core.DatabaseClient; import org.springframework.lang.Nullable; import org.springframework.util.ReflectionUtils; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; /** * Proxy for a target R2DBC {@link ConnectionFactory}, adding awareness of Spring-managed transactions. *

* Data access code that should remain unaware of Spring's data access support can work with this proxy to seamlessly * participate in Spring-managed transactions. Note that the transaction manager, for example - * {@link R2dbcTransactionManager}, still needs to work with the underlying {@link ConnectionFactory}, - * not with this proxy. + * {@link R2dbcTransactionManager}, still needs to work with the underlying {@link ConnectionFactory}, not with + * this proxy. *

* Make sure that {@link TransactionAwareConnectionFactoryProxy} is the outermost {@link ConnectionFactory} of a * chain of {@link ConnectionFactory} proxies/adapters. {@link TransactionAwareConnectionFactoryProxy} can delegate @@ -101,14 +98,15 @@ public class TransactionAwareConnectionFactoryProxy extends DelegatingConnection * @see ConnectionFactoryUtils#doReleaseConnection */ protected Mono getTransactionAwareConnectionProxy(ConnectionFactory targetConnectionFactory) { - return ConnectionFactoryUtils.getConnection(targetConnectionFactory).map(TransactionAwareConnectionFactoryProxy::proxyConnection); + return ConnectionFactoryUtils.getConnection(targetConnectionFactory) + .map(it -> proxyConnection(it, targetConnectionFactory)); } - private static Connection proxyConnection(Tuple2 connectionConnectionFactoryTuple) { + private static Connection proxyConnection(Connection connection, ConnectionFactory targetConnectionFactory) { return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(), - new Class[]{ConnectionProxy.class}, - new TransactionAwareInvocationHandler(connectionConnectionFactoryTuple.getT1(), connectionConnectionFactoryTuple.getT2())); + new Class[] { ConnectionProxy.class }, + new TransactionAwareInvocationHandler(connection, targetConnectionFactory)); } /** diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/TransactionResources.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/TransactionResources.java deleted file mode 100644 index 5324ae2..0000000 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/TransactionResources.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2018-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.connectionfactory; - -import reactor.core.publisher.Mono; - -import org.springframework.data.r2dbc.core.TransactionalDatabaseClient; - -/** - * Transaction context for an ongoing transaction synchronization allowing to register transactional resources. - *

- * Supports one resource per key without overwriting, that is, a resource needs to be removed before a new one can be - * set for the same key. - *

- * Primarily used by {@link ConnectionFactoryUtils} but can be also used by application code to register resources that - * should be bound to a transaction. - * - * @author Mark Paluch - * @see TransactionalDatabaseClient - */ -public interface TransactionResources { - - /** - * Creates a new empty {@link TransactionResources}. - * - * @return the empty {@link TransactionResources}. - */ - static TransactionResources create() { - return new DefaultTransactionResources(); - } - - /** - * Retrieve a resource from this context identified by {@code key}. - * - * @param key the resource key. - * @return the resource emitted through {@link Mono} or {@link Mono#empty()} if the resource was not found. - */ - T getResource(Class key); - - /** - * Register a resource in this context. - * - * @param key the resource key. - * @param value can be a subclass of the {@code key} type. - * @throws IllegalStateException if a resource is already bound under {@code key}. - */ - void registerResource(Class key, T value); -} diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/init/DatabasePopulatorUtils.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/init/DatabasePopulatorUtils.java index 858b1a7..d8a10c4 100644 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/init/DatabasePopulatorUtils.java +++ b/src/main/java/org/springframework/data/r2dbc/connectionfactory/init/DatabasePopulatorUtils.java @@ -18,7 +18,6 @@ package org.springframework.data.r2dbc.connectionfactory.init; import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; import org.springframework.dao.DataAccessException; import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils; @@ -47,7 +46,7 @@ public abstract class DatabasePopulatorUtils { Assert.notNull(populator, "DatabasePopulator must not be null"); Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); - return Mono.usingWhen(ConnectionFactoryUtils.getConnection(connectionFactory).map(Tuple2::getT1), // + return Mono.usingWhen(ConnectionFactoryUtils.getConnection(connectionFactory), // populator::populate, // it -> ConnectionFactoryUtils.releaseConnection(it, connectionFactory), // it -> ConnectionFactoryUtils.releaseConnection(it, connectionFactory)) diff --git a/src/main/java/org/springframework/data/r2dbc/core/DatabaseClient.java b/src/main/java/org/springframework/data/r2dbc/core/DatabaseClient.java index 9ea8337..ff17135 100644 --- a/src/main/java/org/springframework/data/r2dbc/core/DatabaseClient.java +++ b/src/main/java/org/springframework/data/r2dbc/core/DatabaseClient.java @@ -51,12 +51,9 @@ public interface DatabaseClient { * the exchange. The SQL string can contain either native parameter bind markers or named parameters (e.g. * {@literal :foo, :bar}) when {@link NamedParameterExpander} is enabled. * - * @see NamedParameterExpander - * @see DatabaseClient.Builder#namedParameters(NamedParameterExpander) * @param sql must not be {@literal null} or empty. * @return a new {@link GenericExecuteSpec}. - * @see NamedParameterExpander - * @see DatabaseClient.Builder#namedParameters(NamedParameterExpander) + * @see NamedParameterExpander * @see DatabaseClient.Builder#namedParameters(boolean) */ GenericExecuteSpec execute(String sql); @@ -71,19 +68,11 @@ public interface DatabaseClient { * @param sqlSupplier must not be {@literal null}. * @return a new {@link GenericExecuteSpec}. * @see NamedParameterExpander - * @see DatabaseClient.Builder#namedParameters(NamedParameterExpander) + * @see DatabaseClient.Builder#namedParameters(boolean) * @see PreparedOperation */ GenericExecuteSpec execute(Supplier sqlSupplier); - /** - * Prepare an SQL call returning a result. - * - * @deprecated will be removed with 1.0 M3. Use {@link #execute(String)} directly. - */ - @Deprecated - SqlSpec execute(); - /** * Prepare an SQL SELECT call. */ @@ -181,41 +170,6 @@ public interface DatabaseClient { DatabaseClient build(); } - /** - * Contract for specifying a SQL call along with options leading to the exchange. The SQL string can contain either - * native parameter bind markers (e.g. {@literal $1, $2} for Postgres, {@literal @P0, @P1} for SQL Server) or named - * parameters (e.g. {@literal :foo, :bar}) when {@link NamedParameterExpander} is enabled. - *

- * Accepts {@link PreparedOperation} as SQL and binding {@link Supplier}. - *

- * - * @see NamedParameterExpander - * @see DatabaseClient.Builder#namedParameters(NamedParameterExpander) - * @deprecated use {@code DatabaseClient.execute(…)} directly. - */ - @Deprecated - interface SqlSpec { - - /** - * Specify a static {@code sql} string to execute. - * - * @param sql must not be {@literal null} or empty. - * @return a new {@link GenericExecuteSpec}. - */ - @Deprecated - GenericExecuteSpec sql(String sql); - - /** - * Specify a static {@link Supplier SQL supplier} that provides SQL to execute. - * - * @param sqlSupplier must not be {@literal null}. - * @return a new {@link GenericExecuteSpec}. - * @see PreparedOperation - */ - @Deprecated - GenericExecuteSpec sql(Supplier sqlSupplier); - } - /** * Contract for specifying a SQL call along with options leading to the exchange. */ diff --git a/src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java b/src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java index 815a3a4..9792f86 100644 --- a/src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java +++ b/src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java @@ -25,7 +25,6 @@ import io.r2dbc.spi.Statement; import lombok.RequiredArgsConstructor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; @@ -46,6 +45,7 @@ import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; + import org.springframework.dao.DataAccessException; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.Pageable; @@ -98,11 +98,6 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor { return this.builder; } - @Override - public SqlSpec execute() { - return new DefaultSqlSpec(); - } - @Override public SelectFromSpec select() { return new DefaultSelectFromSpec(); @@ -201,7 +196,7 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor { * @return a {@link Mono} able to emit a {@link Connection}. */ protected Mono getConnection() { - return ConnectionFactoryUtils.getConnection(obtainConnectionFactory()).map(Tuple2::getT1); + return ConnectionFactoryUtils.getConnection(obtainConnectionFactory()); } /** @@ -307,27 +302,6 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor { }); } - /** - * Default {@link DatabaseClient.SqlSpec} implementation. - */ - private class DefaultSqlSpec implements SqlSpec { - - @Override - public GenericExecuteSpec sql(String sql) { - - Assert.hasText(sql, "SQL must not be null or empty!"); - return sql(() -> sql); - } - - @Override - public GenericExecuteSpec sql(Supplier sqlSupplier) { - - Assert.notNull(sqlSupplier, "SQL Supplier must not be null!"); - - return createGenericExecuteSpec(sqlSupplier); - } - } - /** * Base class for {@link DatabaseClient.GenericExecuteSpec} implementations. */ diff --git a/src/main/java/org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClient.java b/src/main/java/org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClient.java deleted file mode 100644 index 1d7362e..0000000 --- a/src/main/java/org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClient.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright 2018-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.core; - -import io.r2dbc.spi.Connection; -import io.r2dbc.spi.ConnectionFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.context.Context; -import reactor.util.function.Tuple2; - -import java.util.function.Function; - -import org.reactivestreams.Publisher; -import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils; -import org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization; -import org.springframework.data.r2dbc.connectionfactory.TransactionResources; -import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator; -import org.springframework.transaction.NoTransactionException; - -/** - * Default implementation of a {@link TransactionalDatabaseClient}. - * - * @author Mark Paluch - */ -class DefaultTransactionalDatabaseClient extends DefaultDatabaseClient implements TransactionalDatabaseClient { - - DefaultTransactionalDatabaseClient(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator, - ReactiveDataAccessStrategy dataAccessStrategy, boolean namedParameters, - DefaultDatabaseClientBuilder builder) { - super(connector, exceptionTranslator, dataAccessStrategy, namedParameters, builder); - } - - @Override - public TransactionalDatabaseClient.Builder mutate() { - return (TransactionalDatabaseClient.Builder) super.mutate(); - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#beginTransaction() - */ - @Override - public Mono beginTransaction() { - - Mono transactional = ConnectionFactoryUtils.currentReactiveTransactionSynchronization() // - .map(synchronization -> { - - TransactionResources transactionResources = TransactionResources.create(); - // TODO: This Tx management code creating a TransactionContext. Find a better place. - synchronization.registerTransaction(transactionResources); - return transactionResources; - }); - - return transactional.flatMap(it -> { - return ConnectionFactoryUtils.doGetConnection(obtainConnectionFactory()); - }).flatMap(it -> Mono.from(it.getT1().beginTransaction())); - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#commitTransaction() - */ - @Override - public Mono commitTransaction() { - return cleanup(Connection::commitTransaction); - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#rollbackTransaction() - */ - @Override - public Mono rollbackTransaction() { - return cleanup(Connection::rollbackTransaction); - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#inTransaction(java.util.function.Function) - */ - @Override - public Flux inTransaction(Function> callback) { - - return Flux.usingWhen(beginTransaction().thenReturn(this), callback, // - DefaultTransactionalDatabaseClient::commitTransaction, // - DefaultTransactionalDatabaseClient::rollbackTransaction, // - DefaultTransactionalDatabaseClient::rollbackTransaction) // - .subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization); - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.DefaultDatabaseClient#getConnection() - */ - @Override - protected Mono getConnection() { - return ConnectionFactoryUtils.getConnection(obtainConnectionFactory()).map(Tuple2::getT1); - } - - /** - * Execute a transactional cleanup. Also, deregister the current {@link TransactionResources synchronization} element. - */ - private static Mono cleanup(Function> callback) { - - return ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization() // - .flatMap(synchronization -> { - - TransactionResources currentSynchronization = synchronization.getCurrentTransaction(); - - ConnectionFactory connectionFactory = currentSynchronization.getResource(ConnectionFactory.class); - - if (connectionFactory == null) { - throw new NoTransactionException("No ConnectionFactory attached"); - } - - return Mono.from(connectionFactory.create()) - .flatMap(connection -> Mono.from(callback.apply(connection)) - .then(ConnectionFactoryUtils.releaseConnection(connection, connectionFactory)) - .then(ConnectionFactoryUtils.closeConnection(connection, connectionFactory))) // TODO: Is this rather - // related to - // TransactionContext - // cleanup? - .doFinally(s -> synchronization.unregisterTransaction(currentSynchronization)); - }); - } - - /** - * Potentially register a {@link ReactiveTransactionSynchronization} in the {@link Context} if no synchronization - * object is registered. - * - * @param context the subscriber context. - * @return subscriber context with a registered synchronization. - */ - static Context withTransactionSynchronization(Context context) { - - // associate synchronizer object to host transactional resources. - // TODO: Should be moved to a better place. - return context.put(ReactiveTransactionSynchronization.class, - context.getOrDefault(ReactiveTransactionSynchronization.class, new ReactiveTransactionSynchronization())); - } -} diff --git a/src/main/java/org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClientBuilder.java b/src/main/java/org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClientBuilder.java deleted file mode 100644 index 84de8ba..0000000 --- a/src/main/java/org/springframework/data/r2dbc/core/DefaultTransactionalDatabaseClientBuilder.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2018-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.core; - -import io.r2dbc.spi.ConnectionFactory; - -import java.util.function.Consumer; - -import org.springframework.data.r2dbc.core.DatabaseClient.Builder; -import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator; -import org.springframework.util.Assert; - -/** - * @author Mark Paluch - */ -class DefaultTransactionalDatabaseClientBuilder extends DefaultDatabaseClientBuilder - implements TransactionalDatabaseClient.Builder { - - DefaultTransactionalDatabaseClientBuilder() {} - - DefaultTransactionalDatabaseClientBuilder(DefaultDatabaseClientBuilder other) { - - super(other); - Assert.notNull(other, "DefaultDatabaseClientBuilder must not be null!"); - } - - @Override - public DatabaseClient.Builder clone() { - return new DefaultTransactionalDatabaseClientBuilder(this); - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#connectionFactory(io.r2dbc.spi.ConnectionFactory) - */ - @Override - public TransactionalDatabaseClient.Builder connectionFactory(ConnectionFactory factory) { - super.connectionFactory(factory); - return this; - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#exceptionTranslator(org.springframework.data.r2dbc.support.R2dbcExceptionTranslator) - */ - @Override - public TransactionalDatabaseClient.Builder exceptionTranslator(R2dbcExceptionTranslator exceptionTranslator) { - super.exceptionTranslator(exceptionTranslator); - return this; - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#dataAccessStrategy(org.springframework.data.r2dbc.function.ReactiveDataAccessStrategy) - */ - @Override - public TransactionalDatabaseClient.Builder dataAccessStrategy(ReactiveDataAccessStrategy accessStrategy) { - super.dataAccessStrategy(accessStrategy); - return this; - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#dataAccessStrategy(boolean) - */ - @Override - public TransactionalDatabaseClient.Builder namedParameters(boolean enabled) { - super.namedParameters(enabled); - return this; - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#apply(java.util.function.Consumer) - */ - @Override - public TransactionalDatabaseClient.Builder apply(Consumer builderConsumer) { - super.apply(builderConsumer); - return this; - } - - /* (non-Javadoc) - * @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#build() - */ - @Override - public TransactionalDatabaseClient build() { - return (TransactionalDatabaseClient) super.build(); - } - - @Override - protected DatabaseClient doBuild(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator, - ReactiveDataAccessStrategy accessStrategy, boolean namedParameters, - DefaultDatabaseClientBuilder builder) { - return new DefaultTransactionalDatabaseClient(connector, exceptionTranslator, accessStrategy, namedParameters, - builder); - } -} diff --git a/src/main/java/org/springframework/data/r2dbc/core/TransactionalDatabaseClient.java b/src/main/java/org/springframework/data/r2dbc/core/TransactionalDatabaseClient.java deleted file mode 100644 index d7ae862..0000000 --- a/src/main/java/org/springframework/data/r2dbc/core/TransactionalDatabaseClient.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright 2018-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.core; - -import io.r2dbc.spi.ConnectionFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.function.Consumer; -import java.util.function.Function; - -import org.reactivestreams.Publisher; -import org.springframework.data.r2dbc.connectionfactory.TransactionResources; -import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator; -import org.springframework.transaction.reactive.TransactionalOperator; -import org.springframework.util.Assert; - -/** - * {@link DatabaseClient} that participates in an ongoing transaction if the subscription happens within a hosted - * transaction. Alternatively, transactions can be started and cleaned up using {@link #beginTransaction()} and - * {@link #commitTransaction()}. - *

- * Transactional resources are bound to - * {@link org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization} through nested - * {@link TransactionContext} enabling nested (parallel) transactions. The simplemost approach to use transactions is by - * using {@link #inTransaction(Function)} which will start a transaction and commit it on successful termination. The - * callback allows execution of multiple statements within the same transaction. - * - *

- * Flux transactionalFlux = databaseClient.inTransaction(db -> {
- *
- * 	return db.execute("INSERT INTO person (id, firstname, lastname) VALUES(:id, :firstname, :lastname)") //
- * 			.bind("id", 1) //
- * 			.bind("firstname", "Walter") //
- * 			.bind("lastname", "White") //
- * 			.fetch().rowsUpdated();
- * });
- * 
- * - * Alternatively, transactions can be controlled by using {@link #beginTransaction()} and {@link #commitTransaction()} - * methods. This approach requires {@link #enableTransactionSynchronization(Publisher) enabling of transaction - * synchronization} for the transactional operation. - * - *
- * Mono mono = databaseClient.beginTransaction()
- * 		.then(databaseClient.execute()
- * 				.execute("INSERT INTO person (id, firstname, lastname) VALUES(:id, :firstname, :lastname)") //
- * 				.bind("id", 1) //
- * 				.bind("firstname", "Walter") //
- * 				.bind("lastname", "White") //
- * 				.fetch().rowsUpdated())
- * 		.then(databaseClient.commitTransaction());
- *
- * Mono transactionalMono = databaseClient.enableTransactionSynchronization(mono);
- * 
- *

- * This {@link DatabaseClient} can be safely used without transaction synchronization to invoke database functionality - * in auto-commit transactions. - * - * @author Mark Paluch - * @see #inTransaction(Function) - * @see #enableTransactionSynchronization(Publisher) - * @see #beginTransaction() - * @see #commitTransaction() - * @see #rollbackTransaction() - * @see org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization - * @see TransactionResources - * @see org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils - * @deprecated Use {@link DatabaseClient} in combination with {@link TransactionalOperator}. - */ -@Deprecated -public interface TransactionalDatabaseClient extends DatabaseClient { - - /** - * Start a transaction and bind connection resources to the subscriber context. - * - * @return - */ - Mono beginTransaction(); - - /** - * Commit a transaction and unbind connection resources from the subscriber context. - * - * @return - * @throws org.springframework.transaction.NoTransactionException if no transaction is ongoing. - */ - Mono commitTransaction(); - - /** - * Rollback a transaction and unbind connection resources from the subscriber context. - * - * @return - * @throws org.springframework.transaction.NoTransactionException if no transaction is ongoing. - */ - Mono rollbackTransaction(); - - /** - * Execute a {@link Function} accepting a {@link DatabaseClient} within a managed transaction. {@link Exception Error - * signals} cause the transaction to be rolled back. - * - * @param callback - * @return the callback result. - */ - Flux inTransaction(Function> callback); - - /** - * Enable transaction management so that connections can be bound to the subscription. - * - * @param publisher must not be {@literal null}. - * @return the Transaction-enabled {@link Mono}. - */ - default Mono enableTransactionSynchronization(Mono publisher) { - - Assert.notNull(publisher, "Publisher must not be null!"); - - return publisher.subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization); - } - - /** - * Enable transaction management so that connections can be bound to the subscription. - * - * @param publisher must not be {@literal null}. - * @return the Transaction-enabled {@link Flux}. - */ - default Flux enableTransactionSynchronization(Publisher publisher) { - - Assert.notNull(publisher, "Publisher must not be null!"); - - return Flux.from(publisher).subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization); - } - - /** - * Return a builder to mutate properties of this database client. - */ - TransactionalDatabaseClient.Builder mutate(); - - // Static, factory methods - - /** - * A variant of {@link #create(ConnectionFactory)} that accepts a {@link io.r2dbc.spi.ConnectionFactory}. - */ - static TransactionalDatabaseClient create(ConnectionFactory factory) { - return (TransactionalDatabaseClient) new DefaultTransactionalDatabaseClientBuilder().connectionFactory(factory) - .build(); - } - - /** - * Obtain a {@code DatabaseClient} builder. - */ - static TransactionalDatabaseClient.Builder builder() { - return new DefaultTransactionalDatabaseClientBuilder(); - } - - /** - * A mutable builder for creating a {@link TransactionalDatabaseClient}. - */ - interface Builder extends DatabaseClient.Builder { - - /** - * Configures the {@link ConnectionFactory R2DBC connector}. - * - * @param factory must not be {@literal null}. - * @return {@code this} {@link Builder}. - */ - Builder connectionFactory(ConnectionFactory factory); - - /** - * Configures a {@link R2dbcExceptionTranslator}. - * - * @param exceptionTranslator must not be {@literal null}. - * @return {@code this} {@link Builder}. - */ - Builder exceptionTranslator(R2dbcExceptionTranslator exceptionTranslator); - - /** - * Configures a {@link ReactiveDataAccessStrategy}. - * - * @param accessStrategy must not be {@literal null}. - * @return {@code this} {@link Builder}. - */ - Builder dataAccessStrategy(ReactiveDataAccessStrategy accessStrategy); - - /** - * Configures whether to use named parameter expansion. Defaults to {@literal true}. - * - * @param enabled {@literal true} to use named parameter expansion. {@literal false} to disable named parameter - * expansion. - * @return {@code this} {@link DatabaseClient.Builder}. - * @see NamedParameterExpander - */ - Builder namedParameters(boolean enabled); - - /** - * Configures a {@link Consumer} to configure this builder. - * - * @param builderConsumer must not be {@literal null}. - * @return {@code this} {@link Builder}. - */ - Builder apply(Consumer builderConsumer); - - @Override - TransactionalDatabaseClient build(); - } -} diff --git a/src/test/java/org/springframework/data/r2dbc/config/H2IntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/config/H2IntegrationTests.java index d045065..aa00561 100644 --- a/src/test/java/org/springframework/data/r2dbc/config/H2IntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/config/H2IntegrationTests.java @@ -70,7 +70,7 @@ public class H2IntegrationTests { jdbc.execute("INSERT INTO legoset (id, name, manual) VALUES(42055, 'SCHAUFELRADBAGGER', 12)"); - databaseClient.execute().sql("SELECT COUNT(*) FROM legoset") // + databaseClient.execute("SELECT COUNT(*) FROM legoset") // .as(Long.class) // .fetch() // .all() // diff --git a/src/test/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtilsUnitTests.java b/src/test/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtilsUnitTests.java deleted file mode 100644 index 29c425f..0000000 --- a/src/test/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtilsUnitTests.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright 2018-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.r2dbc.connectionfactory; - -import static org.mockito.Mockito.*; - -import io.r2dbc.spi.Connection; -import io.r2dbc.spi.ConnectionFactory; -import org.assertj.core.api.Assertions; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils; -import org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization; -import org.springframework.data.r2dbc.connectionfactory.TransactionResources; -import org.springframework.transaction.NoTransactionException; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -/** - * Unit tests for {@link ConnectionFactoryUtils}. - * - * @author Mark Paluch - * @author Christoph Strobl - */ -public class ConnectionFactoryUtilsUnitTests { - - @Test // gh-107 - public void currentReactiveTransactionSynchronizationShouldReportSynchronization() { - - ConnectionFactoryUtils.currentReactiveTransactionSynchronization() // - .subscriberContext( - it -> it.put(ReactiveTransactionSynchronization.class, new ReactiveTransactionSynchronization())) - .as(StepVerifier::create) // - .expectNextCount(1) // - .verifyComplete(); - } - - @Test // gh-107 - public void currentReactiveTransactionSynchronizationShouldFailWithoutTxMgmt() { - - ConnectionFactoryUtils.currentReactiveTransactionSynchronization() // - .as(StepVerifier::create) // - .expectError(NoTransactionException.class) // - .verify(); - } - - @Test // gh-107 - public void currentActiveReactiveTransactionSynchronizationShouldReportSynchronization() { - - ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization() // - .subscriberContext(it -> { - ReactiveTransactionSynchronization sync = new ReactiveTransactionSynchronization(); - sync.registerTransaction(TransactionResources.create()); - return it.put(ReactiveTransactionSynchronization.class, sync); - }).as(StepVerifier::create) // - .expectNextCount(1) // - .verifyComplete(); - } - - @Test // gh-107 - public void currentActiveReactiveTransactionSynchronization() { - - ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization() // - .subscriberContext( - it -> it.put(ReactiveTransactionSynchronization.class, new ReactiveTransactionSynchronization())) - .as(StepVerifier::create) // - .expectError(NoTransactionException.class) // - .verify(); - } - - @Test // gh-107 - public void currentConnectionFactoryShouldReportConnectionFactory() { - - ConnectionFactory factoryMock = mock(ConnectionFactory.class); - - ConnectionFactoryUtils.currentConnectionFactory(factoryMock) // - .subscriberContext(it -> { - ReactiveTransactionSynchronization sync = new ReactiveTransactionSynchronization(); - TransactionResources resources = TransactionResources.create(); - resources.registerResource(ConnectionFactory.class, factoryMock); - sync.registerTransaction(resources); - return it.put(ReactiveTransactionSynchronization.class, sync); - }).as(StepVerifier::create) // - .expectNext(factoryMock) // - .verifyComplete(); - } - - @Test // gh-107 - public void connectionFactoryRetunsConnectionWhenNoSyncronisationActive() { - - ConnectionFactory factoryMock = mock(ConnectionFactory.class); - Connection connection = mock(Connection.class); - Publisher p = Mono.just(connection); - doReturn(p).when(factoryMock).create(); - - ConnectionFactoryUtils.getConnection(factoryMock) // - .as(StepVerifier::create) // - .consumeNextWith(it -> { - Assertions.assertThat(it.getT1()).isEqualTo(connection); - Assertions.assertThat(it.getT2()).isEqualTo(factoryMock); - }) - .verifyComplete(); - } -} diff --git a/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java b/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java index aa5bb79..465fde3 100644 --- a/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java +++ b/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java @@ -71,7 +71,7 @@ public class R2dbcTransactionManagerUnitTests { TransactionalOperator operator = TransactionalOperator.create(tm); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1).flatMap(it -> { + ConnectionFactoryUtils.getConnection(connectionFactoryMock).flatMap(it -> { return TransactionSynchronizationManager.forCurrentTransaction() .doOnNext(synchronizationManager -> synchronizationManager.registerSynchronization(sync)); @@ -153,7 +153,7 @@ public class R2dbcTransactionManagerUnitTests { TransactionalOperator operator = TransactionalOperator.create(tm); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1) // + ConnectionFactoryUtils.getConnection(connectionFactoryMock) // .doOnNext(it -> { it.createStatement("foo"); }).then() // @@ -179,7 +179,7 @@ public class R2dbcTransactionManagerUnitTests { TransactionalOperator operator = TransactionalOperator.create(tm); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1).doOnNext(it -> { + ConnectionFactoryUtils.getConnection(connectionFactoryMock).doOnNext(it -> { throw new IllegalStateException(); diff --git a/src/test/java/org/springframework/data/r2dbc/connectionfactory/TransactionAwareConnectionFactoryProxyUnitTests.java b/src/test/java/org/springframework/data/r2dbc/connectionfactory/TransactionAwareConnectionFactoryProxyUnitTests.java index 261fd2e..199777d 100644 --- a/src/test/java/org/springframework/data/r2dbc/connectionfactory/TransactionAwareConnectionFactoryProxyUnitTests.java +++ b/src/test/java/org/springframework/data/r2dbc/connectionfactory/TransactionAwareConnectionFactoryProxyUnitTests.java @@ -18,20 +18,17 @@ package org.springframework.data.r2dbc.connectionfactory; import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; -import java.util.concurrent.atomic.AtomicReference; - import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; -import org.junit.Before; -import org.junit.Test; -import org.springframework.data.r2dbc.connectionfactory.R2dbcTransactionManager; -import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils; -import org.springframework.data.r2dbc.connectionfactory.ConnectionProxy; -import org.springframework.data.r2dbc.connectionfactory.TransactionAwareConnectionFactoryProxy; -import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import reactor.util.function.Tuple2; + +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; + +import org.springframework.transaction.reactive.TransactionalOperator; /** * Unit tests for {@link TransactionAwareConnectionFactoryProxy}. @@ -63,20 +60,17 @@ public class TransactionAwareConnectionFactoryProxyUnitTests { .as(StepVerifier::create) // .consumeNextWith(connection -> { assertThat(connection).isInstanceOf(ConnectionProxy.class); - }) - .verifyComplete(); + }).verifyComplete(); } @Test // gh-107 public void unwrapShouldReturnTargetConnection() { new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() // - .map(ConnectionProxy.class::cast) - .as(StepVerifier::create) // + .map(ConnectionProxy.class::cast).as(StepVerifier::create) // .consumeNextWith(proxy -> { assertThat(proxy.unwrap()).isEqualTo(connectionMock1); - }) - .verifyComplete(); + }).verifyComplete(); } @Test // gh-107 @@ -85,25 +79,21 @@ public class TransactionAwareConnectionFactoryProxyUnitTests { when(connectionMock1.close()).thenReturn(Mono.empty()); new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() // - .map(ConnectionProxy.class::cast) - .flatMap(it -> Mono.from(it.close()).then(Mono.just(it))) + .map(ConnectionProxy.class::cast).flatMap(it -> Mono.from(it.close()).then(Mono.just(it))) .as(StepVerifier::create) // .consumeNextWith(proxy -> { assertThat(proxy.unwrap()).isEqualTo(connectionMock1); - }) - .verifyComplete(); + }).verifyComplete(); } @Test // gh-107 public void getTargetConnectionShouldReturnTargetConnection() { new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() // - .map(ConnectionProxy.class::cast) - .as(StepVerifier::create) // + .map(ConnectionProxy.class::cast).as(StepVerifier::create) // .consumeNextWith(proxy -> { assertThat(proxy.getTargetConnection()).isEqualTo(connectionMock1); - }) - .verifyComplete(); + }).verifyComplete(); } @Test // gh-107 @@ -112,38 +102,32 @@ public class TransactionAwareConnectionFactoryProxyUnitTests { when(connectionMock1.close()).thenReturn(Mono.empty()); new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() // - .map(ConnectionProxy.class::cast) - .flatMap(it -> Mono.from(it.close()).then(Mono.just(it))) + .map(ConnectionProxy.class::cast).flatMap(it -> Mono.from(it.close()).then(Mono.just(it))) .as(StepVerifier::create) // .consumeNextWith(proxy -> { assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> proxy.getTargetConnection()); - }) - .verifyComplete(); + }).verifyComplete(); } @Test // gh-107 public void hashCodeShouldReturnProxyHash() { new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() // - .map(ConnectionProxy.class::cast) - .as(StepVerifier::create) // + .map(ConnectionProxy.class::cast).as(StepVerifier::create) // .consumeNextWith(proxy -> { assertThat(proxy.hashCode()).isEqualTo(System.identityHashCode(proxy)); - }) - .verifyComplete(); + }).verifyComplete(); } @Test // gh-107 public void equalsShouldCompareCorrectly() { new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() // - .map(ConnectionProxy.class::cast) - .as(StepVerifier::create) // + .map(ConnectionProxy.class::cast).as(StepVerifier::create) // .consumeNextWith(proxy -> { assertThat(proxy.equals(proxy)).isTrue(); assertThat(proxy.equals(connectionMock1)).isFalse(); - }) - .verifyComplete(); + }).verifyComplete(); } @Test // gh-107 @@ -158,16 +142,16 @@ public class TransactionAwareConnectionFactoryProxyUnitTests { TransactionAwareConnectionFactoryProxy proxyCf = new TransactionAwareConnectionFactoryProxy(connectionFactoryMock); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1) // + ConnectionFactoryUtils.getConnection(connectionFactoryMock) // .doOnNext(transactionalConnection::set).flatMap(it -> { - return proxyCf.create().doOnNext(connectionFromProxy -> { + return proxyCf.create().doOnNext(connectionFromProxy -> { - ConnectionProxy connectionProxy = (ConnectionProxy) connectionFromProxy; - assertThat(connectionProxy.getTargetConnection()).isSameAs(it); - assertThat(connectionProxy.unwrap()).isSameAs(it); - }); - }).as(rxtx::transactional) // + ConnectionProxy connectionProxy = (ConnectionProxy) connectionFromProxy; + assertThat(connectionProxy.getTargetConnection()).isSameAs(it); + assertThat(connectionProxy.unwrap()).isSameAs(it); + }); + }).as(rxtx::transactional) // .flatMapMany(Connection::close) // .as(StepVerifier::create) // .verifyComplete(); diff --git a/src/test/java/org/springframework/data/r2dbc/core/AbstractDatabaseClientIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/core/AbstractDatabaseClientIntegrationTests.java index 003a325..1f3b83d 100644 --- a/src/test/java/org/springframework/data/r2dbc/core/AbstractDatabaseClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/core/AbstractDatabaseClientIntegrationTests.java @@ -347,7 +347,7 @@ public abstract class AbstractDatabaseClientIntegrationTests extends R2dbcIntegr DatabaseClient databaseClient = DatabaseClient.create(connectionFactory); - databaseClient.execute().sql("SELECT COUNT(*) FROM legoset") // + databaseClient.execute("SELECT COUNT(*) FROM legoset") // .as(Long.class) // .fetch() // .all() // @@ -355,7 +355,7 @@ public abstract class AbstractDatabaseClientIntegrationTests extends R2dbcIntegr .expectNext(1L) // .verifyComplete(); - databaseClient.execute().sql("SELECT name FROM legoset") // + databaseClient.execute("SELECT name FROM legoset") // .as(String.class) // .fetch() // .one() // diff --git a/src/test/java/org/springframework/data/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java index 4d4f115..f724fbb 100644 --- a/src/test/java/org/springframework/data/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java @@ -17,17 +17,18 @@ package org.springframework.data.r2dbc.core; import static org.assertj.core.api.Assertions.*; -import javax.sql.DataSource; -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; - import io.r2dbc.spi.ConnectionFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import javax.sql.DataSource; + import org.assertj.core.api.Condition; import org.junit.After; import org.junit.Before; import org.junit.Test; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; @@ -36,22 +37,16 @@ import org.springframework.context.support.GenericApplicationContext; import org.springframework.dao.DataAccessException; import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration; import org.springframework.data.r2dbc.connectionfactory.R2dbcTransactionManager; -import org.springframework.data.r2dbc.core.DatabaseClient; -import org.springframework.data.r2dbc.core.TransactionalDatabaseClient; import org.springframework.data.r2dbc.testing.R2dbcIntegrationTestSupport; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.transaction.NoTransactionException; import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.reactive.TransactionalOperator; import org.springframework.transaction.support.DefaultTransactionDefinition; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; /** - * Abstract base class for integration tests for {@link TransactionalDatabaseClient}. + * Abstract base class for transactional integration tests for {@link DatabaseClient}. * * @author Mark Paluch * @author Christoph Strobl @@ -65,6 +60,10 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend AnnotationConfigApplicationContext context; TransactionalService service; + DatabaseClient databaseClient; + R2dbcTransactionManager transactionManager; + TransactionalOperator rxtx; + @Before public void before() { @@ -80,10 +79,13 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend jdbc = createJdbcTemplate(createDataSource()); try { jdbc.execute("DROP TABLE legoset"); - } catch (DataAccessException e) { - } + } catch (DataAccessException e) {} jdbc.execute(getCreateTableStatement()); jdbc.execute("DELETE FROM legoset"); + + databaseClient = DatabaseClient.create(connectionFactory); + transactionManager = new R2dbcTransactionManager(connectionFactory); + rxtx = TransactionalOperator.create(transactionManager); } @After @@ -143,15 +145,12 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend @Test // gh-2 public void executeInsertInManagedTransaction() { - TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory); - - Flux integerFlux = databaseClient.inTransaction(db -> db // + Mono integerFlux = databaseClient // .execute(getInsertIntoLegosetStatement()) // .bind(0, 42055) // .bind(1, "SCHAUFELRADBAGGER") // .bindNull(2, Integer.class) // - .fetch().rowsUpdated() // - ); + .fetch().rowsUpdated().as(rxtx::transactional); integerFlux.as(StepVerifier::create) // .expectNext(1) // @@ -163,13 +162,11 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend @Test // gh-2 public void executeInsertInAutoCommitTransaction() { - TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory); - Mono integerFlux = databaseClient.execute(getInsertIntoLegosetStatement()) // .bind(0, 42055) // .bind(1, "SCHAUFELRADBAGGER") // .bindNull(2, Integer.class) // - .fetch().rowsUpdated(); + .fetch().rowsUpdated().as(rxtx::transactional); integerFlux.as(StepVerifier::create) // .expectNext(1) // @@ -178,58 +175,15 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend assertThat(jdbc.queryForMap("SELECT id, name, manual FROM legoset")).hasEntrySatisfying("id", numberOf(42055)); } - @Test // gh-2 - public void shouldManageUserTransaction() { - - Queue transactionIds = new ArrayBlockingQueue<>(5); - TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory); - - Flux txId = databaseClient // - .execute(getCurrentTransactionIdStatement()) // - .map((r, md) -> r.get(0, Long.class)) // - .all(); - - Mono then = databaseClient.enableTransactionSynchronization(databaseClient.beginTransaction() // - .thenMany(txId.concatWith(txId).doOnNext(transactionIds::add)) // - .then(databaseClient.rollbackTransaction())); - - then.as(StepVerifier::create) // - .verifyComplete(); - - List listOfTxIds = new ArrayList<>(transactionIds); - assertThat(listOfTxIds).hasSize(2); - assertThat(listOfTxIds).containsExactly(listOfTxIds.get(1), listOfTxIds.get(0)); - } - - @Test // gh-2 - public void userTransactionManagementShouldFailWithoutSynchronizer() { - - TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory); - - Mono then = databaseClient.beginTransaction().then(databaseClient.rollbackTransaction()); - - then.as(StepVerifier::create) // - .consumeErrorWith(exception -> { - - assertThat(exception).isInstanceOf(NoTransactionException.class) - .hasMessageContaining("Transaction management is not enabled"); - }).verify(); - } - @Test // gh-2 public void shouldRollbackTransaction() { - TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory); - - Flux integerFlux = databaseClient.inTransaction(db -> { - - return db.execute(getInsertIntoLegosetStatement()) // - .bind(0, 42055) // - .bind(1, "SCHAUFELRADBAGGER") // - .bindNull(2, Integer.class) // - .fetch().rowsUpdated() // - .then(Mono.error(new IllegalStateException("failed"))); - }); + Mono integerFlux = databaseClient.execute(getInsertIntoLegosetStatement()) // + .bind(0, 42055) // + .bind(1, "SCHAUFELRADBAGGER") // + .bindNull(2, Integer.class) // + .fetch().rowsUpdated() // + .then(Mono.error(new IllegalStateException("failed"))).as(rxtx::transactional); integerFlux.as(StepVerifier::create) // .expectError(IllegalStateException.class) // @@ -242,17 +196,12 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend @Test // gh-2, gh-75, gh-107 public void emitTransactionIds() { - DatabaseClient databaseClient = DatabaseClient.create(connectionFactory); - - TransactionalOperator transactionalOperator = TransactionalOperator - .create(new R2dbcTransactionManager(connectionFactory), new DefaultTransactionDefinition()); - Flux txId = databaseClient.execute(getCurrentTransactionIdStatement()) // .map((row, md) -> row.get(0)) // .all(); Flux transactionIds = prepareForTransaction(databaseClient).thenMany(txId.concatWith(txId)) // - .as(transactionalOperator::transactional); + .as(rxtx::transactional); transactionIds.collectList().as(StepVerifier::create) // .consumeNextWith(actual -> { @@ -271,8 +220,7 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend TransactionalOperator transactionalOperator = TransactionalOperator .create(new R2dbcTransactionManager(connectionFactory), new DefaultTransactionDefinition()); - Flux integerFlux = databaseClient.execute() // - .sql(getInsertIntoLegosetStatement()) // + Flux integerFlux = databaseClient.execute(getInsertIntoLegosetStatement()) // .bind(0, 42055) // .bind(1, "SCHAUFELRADBAGGER") // .bindNull(2, Integer.class) // @@ -290,10 +238,11 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend assertThat(count).isEqualTo(0); } - @Test //gh-107 + @Test // gh-107 public void emitTransactionIdsUsingManagedTransactions() { - service.emitTransactionIds(prepareForTransaction(service.getDatabaseClient()), getCurrentTransactionIdStatement()).collectList().as(StepVerifier::create) // + service.emitTransactionIds(prepareForTransaction(service.getDatabaseClient()), getCurrentTransactionIdStatement()) + .collectList().as(StepVerifier::create) // .consumeNextWith(actual -> { assertThat(actual).hasSize(2); @@ -352,19 +301,17 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend @Transactional public Flux emitTransactionIds(Mono prepareTransaction, String idStatement) { - Flux txId = databaseClient.execute() // - .sql(idStatement) // + Flux txId = databaseClient.execute(idStatement) // .map((row, md) -> row.get(0)) // .all(); return prepareTransaction.thenMany(txId.concatWith(txId)); } - @Transactional public Flux shouldRollbackTransactionUsingTransactionalOperator(String insertStatement) { - return databaseClient.execute().sql(insertStatement) // + return databaseClient.execute(insertStatement) // .bind(0, 42055) // .bind(1, "SCHAUFELRADBAGGER") // .bindNull(2, Integer.class) // diff --git a/src/test/java/org/springframework/data/r2dbc/core/MySqlTransactionalDatabaseClientIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/core/MySqlTransactionalDatabaseClientIntegrationTests.java index 707eadf..65eb7b4 100644 --- a/src/test/java/org/springframework/data/r2dbc/core/MySqlTransactionalDatabaseClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/core/MySqlTransactionalDatabaseClientIntegrationTests.java @@ -22,14 +22,13 @@ import io.r2dbc.spi.ConnectionFactory; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Test; -import org.springframework.data.r2dbc.core.DatabaseClient; -import org.springframework.data.r2dbc.core.TransactionalDatabaseClient; + import org.springframework.data.r2dbc.testing.ExternalDatabase; import org.springframework.data.r2dbc.testing.MySqlTestSupport; import reactor.core.publisher.Mono; /** - * Integration tests for {@link TransactionalDatabaseClient} against MySQL. + * Transactional integration tests for {@link DatabaseClient} against MySQL. * * @author Mark Paluch */ @@ -63,7 +62,7 @@ public class MySqlTransactionalDatabaseClientIntegrationTests * batches every now and then. * @see: https://dev.mysql.com/doc/refman/5.7/en/innodb-information-schema-internal-data.html */ - return client.execute().sql(getInsertIntoLegosetStatement()) // + return client.execute(getInsertIntoLegosetStatement()) // .bind(0, 42055) // .bind(1, "SCHAUFELRADBAGGER") // .bindNull(2, Integer.class) // @@ -76,10 +75,4 @@ public class MySqlTransactionalDatabaseClientIntegrationTests protected String getCurrentTransactionIdStatement() { return "SELECT tx.trx_id FROM information_schema.innodb_trx tx WHERE tx.trx_mysql_thread_id = connection_id()"; } - - @Override - @Test - @Ignore("MySQL creates transactions only on interaction with transactional tables. BEGIN does not create a txid") - public void shouldManageUserTransaction() { - } } diff --git a/src/test/java/org/springframework/data/r2dbc/core/PostgresTransactionalDatabaseClientIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/core/PostgresTransactionalDatabaseClientIntegrationTests.java index 767117c..0a164c0 100644 --- a/src/test/java/org/springframework/data/r2dbc/core/PostgresTransactionalDatabaseClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/core/PostgresTransactionalDatabaseClientIntegrationTests.java @@ -5,12 +5,12 @@ import io.r2dbc.spi.ConnectionFactory; import javax.sql.DataSource; import org.junit.ClassRule; -import org.springframework.data.r2dbc.core.TransactionalDatabaseClient; + import org.springframework.data.r2dbc.testing.ExternalDatabase; import org.springframework.data.r2dbc.testing.PostgresTestSupport; /** - * Integration tests for {@link TransactionalDatabaseClient} against PostgreSQL. + * Transactional integration tests for {@link DatabaseClient} against PostgreSQL. * * @author Mark Paluch */ diff --git a/src/test/java/org/springframework/data/r2dbc/core/SqlServerTransactionalDatabaseClientIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/core/SqlServerTransactionalDatabaseClientIntegrationTests.java index 0c5f924..7698dba 100644 --- a/src/test/java/org/springframework/data/r2dbc/core/SqlServerTransactionalDatabaseClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/core/SqlServerTransactionalDatabaseClientIntegrationTests.java @@ -5,12 +5,12 @@ import io.r2dbc.spi.ConnectionFactory; import javax.sql.DataSource; import org.junit.ClassRule; -import org.springframework.data.r2dbc.core.TransactionalDatabaseClient; + import org.springframework.data.r2dbc.testing.ExternalDatabase; import org.springframework.data.r2dbc.testing.SqlServerTestSupport; /** - * Integration tests for {@link TransactionalDatabaseClient} against Microsoft SQL Server. + * Transactional integration tests for {@link DatabaseClient} against Microsoft SQL Server. * * @author Mark Paluch */ diff --git a/src/test/java/org/springframework/data/r2dbc/repository/AbstractR2dbcRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/repository/AbstractR2dbcRepositoryIntegrationTests.java index ba74677..c2c7d64 100644 --- a/src/test/java/org/springframework/data/r2dbc/repository/AbstractR2dbcRepositoryIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/repository/AbstractR2dbcRepositoryIntegrationTests.java @@ -40,9 +40,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.data.annotation.Id; import org.springframework.data.annotation.PersistenceConstructor; +import org.springframework.data.r2dbc.connectionfactory.R2dbcTransactionManager; import org.springframework.data.r2dbc.convert.MappingR2dbcConverter; import org.springframework.data.r2dbc.core.DefaultReactiveDataAccessStrategy; -import org.springframework.data.r2dbc.core.TransactionalDatabaseClient; import org.springframework.data.r2dbc.dialect.DialectResolver; import org.springframework.data.r2dbc.dialect.R2dbcDialect; import org.springframework.data.r2dbc.repository.support.R2dbcRepositoryFactory; @@ -52,6 +52,7 @@ import org.springframework.data.relational.core.mapping.Table; import org.springframework.data.repository.NoRepositoryBean; import org.springframework.data.repository.reactive.ReactiveCrudRepository; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.reactive.TransactionalOperator; /** * Abstract base class for integration tests for {@link LegoSetRepository} using {@link R2dbcRepositoryFactory}. @@ -60,9 +61,8 @@ import org.springframework.jdbc.core.JdbcTemplate; */ public abstract class AbstractR2dbcRepositoryIntegrationTests extends R2dbcIntegrationTestSupport { - private static RelationalMappingContext mappingContext = new RelationalMappingContext(); - @Autowired private LegoSetRepository repository; + @Autowired private ConnectionFactory connectionFactory; private JdbcTemplate jdbc; @Before @@ -174,25 +174,18 @@ public abstract class AbstractR2dbcRepositoryIntegrationTests extends R2dbcInteg @Test public void shouldInsertItemsTransactional() { - R2dbcDialect dialect = DialectResolver.getDialect(createConnectionFactory()); - DefaultReactiveDataAccessStrategy dataAccessStrategy = new DefaultReactiveDataAccessStrategy(dialect, - new MappingR2dbcConverter(mappingContext)); - TransactionalDatabaseClient client = TransactionalDatabaseClient.builder() - .connectionFactory(createConnectionFactory()).dataAccessStrategy(dataAccessStrategy).build(); - LegoSetRepository transactionalRepository = new R2dbcRepositoryFactory(client, dataAccessStrategy) - .getRepository(getRepositoryInterfaceType()); + R2dbcTransactionManager r2dbcTransactionManager = new R2dbcTransactionManager(connectionFactory); + TransactionalOperator rxtx = TransactionalOperator.create(r2dbcTransactionManager); LegoSet legoSet1 = new LegoSet(null, "SCHAUFELRADBAGGER", 12); LegoSet legoSet2 = new LegoSet(null, "FORSCHUNGSSCHIFF", 13); - Flux> transactional = client.inTransaction(db -> { + Mono> transactional = repository.save(legoSet1) // + .map(it -> jdbc.queryForMap("SELECT count(*) AS count FROM legoset")).as(rxtx::transactional); - return transactionalRepository.save(legoSet1) // - .map(it -> jdbc.queryForMap("SELECT count(*) AS count FROM legoset")); - }); - Mono> nonTransactional = transactionalRepository.save(legoSet2) // + Mono> nonTransactional = repository.save(legoSet2) // .map(it -> jdbc.queryForMap("SELECT count(*) AS count FROM legoset")); transactional.as(StepVerifier::create).expectNext(Collections.singletonMap("count", 0L)).verifyComplete(); diff --git a/src/test/java/org/springframework/data/r2dbc/repository/H2R2dbcRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/repository/H2R2dbcRepositoryIntegrationTests.java index 0c7b802..d0a7546 100644 --- a/src/test/java/org/springframework/data/r2dbc/repository/H2R2dbcRepositoryIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/repository/H2R2dbcRepositoryIntegrationTests.java @@ -23,6 +23,7 @@ import javax.sql.DataSource; import org.junit.runner.RunWith; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan.Filter; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; @@ -48,6 +49,7 @@ public class H2R2dbcRepositoryIntegrationTests extends AbstractR2dbcRepositoryIn includeFilters = @Filter(classes = H2LegoSetRepository.class, type = FilterType.ASSIGNABLE_TYPE)) static class IntegrationTestConfiguration extends AbstractR2dbcConfiguration { + @Bean @Override public ConnectionFactory connectionFactory() { return H2TestSupport.createConnectionFactory(); diff --git a/src/test/java/org/springframework/data/r2dbc/repository/MySqlR2dbcRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/repository/MySqlR2dbcRepositoryIntegrationTests.java index aeb61e8..a9e53da 100644 --- a/src/test/java/org/springframework/data/r2dbc/repository/MySqlR2dbcRepositoryIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/repository/MySqlR2dbcRepositoryIntegrationTests.java @@ -24,6 +24,7 @@ import javax.sql.DataSource; import org.junit.ClassRule; import org.junit.runner.RunWith; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan.Filter; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; @@ -52,6 +53,7 @@ public class MySqlR2dbcRepositoryIntegrationTests extends AbstractR2dbcRepositor includeFilters = @Filter(classes = MySqlLegoSetRepository.class, type = FilterType.ASSIGNABLE_TYPE)) static class IntegrationTestConfiguration extends AbstractR2dbcConfiguration { + @Bean @Override public ConnectionFactory connectionFactory() { return MySqlTestSupport.createConnectionFactory(database); diff --git a/src/test/java/org/springframework/data/r2dbc/repository/PostgresR2dbcRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/repository/PostgresR2dbcRepositoryIntegrationTests.java index 7fc6977..3c70489 100644 --- a/src/test/java/org/springframework/data/r2dbc/repository/PostgresR2dbcRepositoryIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/repository/PostgresR2dbcRepositoryIntegrationTests.java @@ -24,6 +24,7 @@ import javax.sql.DataSource; import org.junit.ClassRule; import org.junit.runner.RunWith; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan.Filter; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; @@ -52,6 +53,7 @@ public class PostgresR2dbcRepositoryIntegrationTests extends AbstractR2dbcReposi includeFilters = @Filter(classes = PostgresLegoSetRepository.class, type = FilterType.ASSIGNABLE_TYPE)) static class IntegrationTestConfiguration extends AbstractR2dbcConfiguration { + @Bean @Override public ConnectionFactory connectionFactory() { return PostgresTestSupport.createConnectionFactory(database); diff --git a/src/test/java/org/springframework/data/r2dbc/repository/SqlServerR2dbcRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/r2dbc/repository/SqlServerR2dbcRepositoryIntegrationTests.java index ca2f790..4442e3a 100644 --- a/src/test/java/org/springframework/data/r2dbc/repository/SqlServerR2dbcRepositoryIntegrationTests.java +++ b/src/test/java/org/springframework/data/r2dbc/repository/SqlServerR2dbcRepositoryIntegrationTests.java @@ -25,6 +25,7 @@ import org.junit.ClassRule; import org.junit.Ignore; import org.junit.runner.RunWith; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan.Filter; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; @@ -53,6 +54,7 @@ public class SqlServerR2dbcRepositoryIntegrationTests extends AbstractR2dbcRepos includeFilters = @Filter(classes = SqlServerLegoSetRepository.class, type = FilterType.ASSIGNABLE_TYPE)) static class IntegrationTestConfiguration extends AbstractR2dbcConfiguration { + @Bean @Override public ConnectionFactory connectionFactory() { return SqlServerTestSupport.createConnectionFactory(database);