#124 - Remove deprecated DatabaseClient.execute().sql(…) and TransactionalDatabaseClient.

This commit is contained in:
Mark Paluch
2019-07-15 13:39:23 +02:00
parent b1e97035f2
commit 3c3277eac4
31 changed files with 177 additions and 1353 deletions

View File

@@ -139,7 +139,7 @@ public class R2dbcApp {
DatabaseClient client = DatabaseClient.create(connectionFactory);
client.sql("CREATE TABLE person" +
client.execute("CREATE TABLE person" +
"(id VARCHAR(255) PRIMARY KEY," +
"name VARCHAR(255)," +
"age INT)")

View File

@@ -6,7 +6,7 @@ The following example shows what you need to include for minimal but fully funct
[source,java]
----
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
Mono<Void> completion = client.execute("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
----
@@ -14,7 +14,7 @@ Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY
It exposes intermediate, continuation, and terminal methods at each stage of the execution specification.
The example above uses `then()` to return a completion `Publisher` that completes as soon as the query (or queries, if the SQL query contains multiple statements) completes.
NOTE: `sql(…)` accepts either the SQL query string or a query `Supplier<String>` to defer the actual query creation until execution.
NOTE: `execute(…)` accepts either the SQL query string or a query `Supplier<String>` to defer the actual query creation until execution.
[[r2dbc.datbaseclient.queries]]
== Running Queries
@@ -26,7 +26,7 @@ The following example shows an `UPDATE` statement that returns the number of upd
[source,java]
----
Mono<Integer> affectedRows = client.sql("UPDATE person SET name = 'Joe'")
Mono<Integer> affectedRows = client.execute("UPDATE person SET name = 'Joe'")
.fetch().rowsUpdated();
----
@@ -36,7 +36,7 @@ You might have noticed the use of `fetch()` in the previous example.
[source,java]
----
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
Mono<Map<String, Object>> first = client.execute("SELECT id, name FROM person")
.fetch().first();
----
@@ -52,7 +52,7 @@ You can consume data with the following operators:
[source,java]
----
Flux<Person> all = client.sql("SELECT id, name FROM mytable")
Flux<Person> all = client.execute("SELECT id, name FROM mytable")
.as(Person.class)
.fetch().all();
----
@@ -69,7 +69,7 @@ The following example extracts the `id` column and emits its value:
[source,java]
----
Flux<String> names = client.sql("SELECT name FROM person")
Flux<String> names = client.execute("SELECT name FROM person")
.map((row, rowMetadata) -> row.get("id", String.class))
.all();
----
@@ -90,7 +90,7 @@ A typical application requires parameterized SQL statements to select or update
These are typically `SELECT` statements constrained by a `WHERE` clause or `INSERT`/`UPDATE` statements accepting input parameters.
Parameterized statements bear the risk of SQL injection if parameters are not escaped properly.
`DatabaseClient` leverages R2DBC's Bind API to eliminate the risk of SQL injection for query parameters.
You can provide a parameterized SQL statement with the `sql(…)` operator and bind parameters to the actual `Statement`.
You can provide a parameterized SQL statement with the `execute(…)` operator and bind parameters to the actual `Statement`.
Your R2DBC driver then executes the statement using prepared statements and parameter substitution.
Parameter binding supports various binding strategies:
@@ -102,7 +102,7 @@ The following example shows parameter binding for a query:
[source,java]
----
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
db.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
@@ -140,7 +140,7 @@ List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});
db.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
db.execute("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
----
@@ -150,6 +150,6 @@ A simpler variant using `IN` predicates:
[source,java]
----
db.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
db.execute("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
----

View File

@@ -18,12 +18,12 @@ TransactionalOperator operator = TransactionalOperator.create(tm); <1>
DatabaseClient client = DatabaseClient.create(connectionFactory);
Mono<Void> atomicOperation = client.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
Mono<Void> atomicOperation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34)
.fetch().rowsUpdated()
.then(client.sql("INSERT INTO contacts (id, name) VALUES(:id, :name)")
.then(client.execute("INSERT INTO contacts (id, name) VALUES(:id, :name)")
.bind("id", "joe")
.bind("name", "Joe")
.fetch().rowsUpdated())
@@ -69,12 +69,12 @@ class MyService {
@Transactional
public Mono<Void> insertPerson() {
return client.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
return client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34)
.fetch().rowsUpdated()
.then(client.sql("INSERT INTO contacts (id, name) VALUES(:id, :name)")
.then(client.execute("INSERT INTO contacts (id, name) VALUES(:id, :name)")
.bind("id", "joe")
.bind("name", "Joe")
.fetch().rowsUpdated())

View File

@@ -23,7 +23,6 @@ import reactor.util.function.Tuples;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.core.Ordered;
import org.springframework.dao.DataAccessResourceFailureException;
@@ -67,7 +66,7 @@ public abstract class ConnectionFactoryUtils {
* @throws DataAccessResourceFailureException if the attempt to get a {@link io.r2dbc.spi.Connection} failed
* @see #releaseConnection
*/
public static Mono<Tuple2<Connection, ConnectionFactory>> getConnection(ConnectionFactory connectionFactory) {
public static Mono<Connection> getConnection(ConnectionFactory connectionFactory) {
return doGetConnection(connectionFactory)
.onErrorMap(e -> new DataAccessResourceFailureException("Failed to obtain R2DBC Connection", e));
}
@@ -82,7 +81,7 @@ public abstract class ConnectionFactoryUtils {
* @param connectionFactory the {@link ConnectionFactory} to obtain Connections from.
* @return a R2DBC {@link io.r2dbc.spi.Connection} from the given {@link ConnectionFactory}.
*/
public static Mono<Tuple2<Connection, ConnectionFactory>> doGetConnection(ConnectionFactory connectionFactory) {
public static Mono<Connection> doGetConnection(ConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
@@ -138,56 +137,11 @@ public abstract class ConnectionFactoryUtils {
return con;
}) //
.map(conn -> Tuples.of(conn, connectionFactory)) //
.onErrorResume(NoTransactionException.class, e -> {
return Mono.subscriberContext().flatMap(it -> {
if (it.hasKey(ReactiveTransactionSynchronization.class)) {
ReactiveTransactionSynchronization synchronization = it.get(ReactiveTransactionSynchronization.class);
return obtainConnection(synchronization, connectionFactory);
}
return Mono.empty();
}).switchIfEmpty(Mono.defer(() -> {
return Mono.from(connectionFactory.create()).map(it -> Tuples.of(it, connectionFactory));
}));
return Mono.from(connectionFactory.create());
});
}
private static Mono<Tuple2<Connection, ConnectionFactory>> obtainConnection(
ReactiveTransactionSynchronization synchronization, ConnectionFactory connectionFactory) {
if (synchronization.isSynchronizationActive()) {
if (logger.isDebugEnabled()) {
logger.debug("Registering transaction synchronization for R2DBC Connection");
}
TransactionResources txContext = synchronization.getCurrentTransaction();
ConnectionFactory resource = txContext.getResource(ConnectionFactory.class);
Mono<Tuple2<Connection, ConnectionFactory>> attachNewConnection = Mono
.defer(() -> Mono.from(connectionFactory.create()).map(it -> {
if (logger.isDebugEnabled()) {
logger.debug("Fetching new R2DBC Connection from ConnectionFactory");
}
SingletonConnectionFactory s = new SingletonConnectionFactory(connectionFactory.getMetadata(), it);
txContext.registerResource(ConnectionFactory.class, s);
return Tuples.of(it, connectionFactory);
}));
return Mono.justOrEmpty(resource).flatMap(ConnectionFactoryUtils::createConnection)
.switchIfEmpty(attachNewConnection);
}
return Mono.empty();
}
/**
* Actually fetch a {@link Connection} from the given {@link ConnectionFactory}, defensively turning an unexpected
* {@code null} return value from {@link ConnectionFactory#create()} into an {@link IllegalStateException}.
@@ -198,12 +152,7 @@ public abstract class ConnectionFactoryUtils {
* @see ConnectionFactory#create()
*/
private static Mono<Connection> fetchConnection(ConnectionFactory connectionFactory) {
Publisher<? extends Connection> con = connectionFactory.create();
if (con == null) { // TODO: seriously why would it do that?
throw new IllegalStateException("ConnectionFactory returned null from getConnection(): " + connectionFactory);
}
return Mono.from(con);
return Mono.from(connectionFactory.create());
}
/**
@@ -226,40 +175,24 @@ public abstract class ConnectionFactoryUtils {
* Actually close the given {@link io.r2dbc.spi.Connection}, obtained from the given {@link ConnectionFactory}. Same
* as {@link #releaseConnection}, but preserving the original exception.
*
* @param con the {@link io.r2dbc.spi.Connection} to close if necessary.
* @param connection the {@link io.r2dbc.spi.Connection} to close if necessary.
* @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from (may be
* {@literal null}).
* @see #doGetConnection
*/
public static Mono<Void> doReleaseConnection(@Nullable io.r2dbc.spi.Connection con,
public static Mono<Void> doReleaseConnection(@Nullable io.r2dbc.spi.Connection connection,
@Nullable ConnectionFactory connectionFactory) {
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> {
ConnectionHolder conHolder = (ConnectionHolder) it.getResource(connectionFactory);
if (conHolder != null && connectionEquals(conHolder, con)) {
if (conHolder != null && connectionEquals(conHolder, connection)) {
// It's the transactional Connection: Don't close it.
conHolder.released();
}
return Mono.from(con.close());
return Mono.from(connection.close());
}).onErrorResume(NoTransactionException.class, e -> {
if (connectionFactory instanceof SingletonConnectionFactory) {
SingletonConnectionFactory factory = (SingletonConnectionFactory) connectionFactory;
if (logger.isDebugEnabled()) {
logger.debug("Releasing R2DBC Connection");
}
return factory.close(con);
}
if (logger.isDebugEnabled()) {
logger.debug("Closing R2DBC Connection");
}
return Mono.from(con.close());
return doCloseConnection(connection, connectionFactory);
});
}
@@ -287,55 +220,23 @@ public abstract class ConnectionFactoryUtils {
*/
public static Mono<Void> doCloseConnection(Connection connection, @Nullable ConnectionFactory connectionFactory) {
if (!(connectionFactory instanceof SingletonConnectionFactory)
|| ((SingletonConnectionFactory) connectionFactory).shouldClose(connection)) {
if (!(connectionFactory instanceof SmartConnectionFactory)
|| ((SmartConnectionFactory) connectionFactory).shouldClose(connection)) {
SingletonConnectionFactory factory = (SingletonConnectionFactory) connectionFactory;
return factory.close(connection).then(Mono.from(connection.close()));
if (logger.isDebugEnabled()) {
logger.debug("Closing R2DBC Connection");
}
return Mono.from(connection.close());
}
return Mono.empty();
}
/**
* Obtain the currently {@link ReactiveTransactionSynchronization} from the current subscriber
* {@link reactor.util.context.Context}.
*
* @throws NoTransactionException if no active {@link ReactiveTransactionSynchronization} is associated with the
* current subscription.
* @see Mono#subscriberContext()
* @see ReactiveTransactionSynchronization
*/
public static Mono<ReactiveTransactionSynchronization> currentReactiveTransactionSynchronization() {
return Mono.subscriberContext().filter(it -> it.hasKey(ReactiveTransactionSynchronization.class)) //
.switchIfEmpty(Mono.error(new NoTransactionException(
"Transaction management is not enabled. Make sure to register ReactiveTransactionSynchronization in the subscriber Context!"))) //
.map(it -> it.get(ReactiveTransactionSynchronization.class));
}
/**
* Obtain the currently active {@link ReactiveTransactionSynchronization} from the current subscriber
* {@link reactor.util.context.Context}.
*
* @throws NoTransactionException if no active {@link ReactiveTransactionSynchronization} is associated with the
* current subscription.
* @see Mono#subscriberContext()
* @see ReactiveTransactionSynchronization
*/
public static Mono<ReactiveTransactionSynchronization> currentActiveReactiveTransactionSynchronization() {
return currentReactiveTransactionSynchronization()
.filter(ReactiveTransactionSynchronization::isSynchronizationActive) //
.switchIfEmpty(Mono.error(new NoTransactionException("ReactiveTransactionSynchronization not active!")));
}
/**
* Obtain the {@link io.r2dbc.spi.ConnectionFactory} from the current subscriber {@link reactor.util.context.Context}.
*
* @see Mono#subscriberContext()
* @see ReactiveTransactionSynchronization
* @see TransactionResources
* @see TransactionSynchronizationManager
*/
public static Mono<ConnectionFactory> currentConnectionFactory(ConnectionFactory connectionFactory) {
@@ -347,8 +248,7 @@ public abstract class ConnectionFactoryUtils {
return true;
}
return false;
}).map(it -> connectionFactory) //
.onErrorResume(NoTransactionException.class, ConnectionFactoryUtils::obtainDefaultConnectionFactory);
}).map(it -> connectionFactory);
}
/**
@@ -410,20 +310,6 @@ public abstract class ConnectionFactoryUtils {
return order;
}
/**
* @param e
* @return an {@link Mono#error(Throwable) error} if not transaction present.
*/
private static Mono<? extends ConnectionFactory> obtainDefaultConnectionFactory(NoTransactionException e) {
return currentActiveReactiveTransactionSynchronization().map(synchronization -> {
TransactionResources currentSynchronization = synchronization.getCurrentTransaction();
return currentSynchronization.getResource(ConnectionFactory.class);
}).switchIfEmpty(Mono.error(
new DataAccessResourceFailureException("Cannot extract ConnectionFactory from current TransactionContext!")));
}
/**
* Create a {@link Connection} via the given {@link ConnectionFactory#create() factory} and return a {@link Tuple2}
* associating the {@link Connection} with its creating {@link ConnectionFactory}.

View File

@@ -1,53 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.connectionfactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.util.Assert;
/**
* Default implementation of {@link TransactionResources}.
*
* @author Mark Paluch
*/
class DefaultTransactionResources implements TransactionResources {
private Map<Class<?>, Object> items = new ConcurrentHashMap<>();
/*
* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.connectionfactory.TransactionResources#registerResource(java.lang.Class, java.lang.Object)
*/
@Override
public <T> void registerResource(Class<T> key, T value) {
Assert.state(!items.containsKey(key), () -> String.format("Resource for %s is already bound", key));
items.put(key, value);
}
/*
* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.connectionfactory.TransactionResources#getResource(java.lang.Class)
*/
@SuppressWarnings("unchecked")
@Override
public <T> T getResource(Class<T> key) {
return (T) items.get(key);
}
}

View File

@@ -70,8 +70,7 @@ import org.springframework.util.Assert;
* @see TransactionAwareConnectionFactoryProxy
* @see DatabaseClient
*/
public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
implements InitializingBean {
public class R2dbcTransactionManager extends AbstractReactiveTransactionManager implements InitializingBean {
private ConnectionFactory connectionFactory;
@@ -473,12 +472,12 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* ConnectionFactory transaction object, representing a ConnectionHolder. Used as transaction object by
* ConnectionFactoryTransactionManager.
*/
private static class ConnectionFactoryTransactionObject extends R2dbcTransactionObjectSupport {
private static class ConnectionFactoryTransactionObject {
private boolean newConnectionHolder;
void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
super.setConnectionHolder(connectionHolder);
setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
}
@@ -489,5 +488,41 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
void setRollbackOnly() {
getConnectionHolder().setRollbackOnly();
}
@Nullable private ConnectionHolder connectionHolder;
@Nullable private IsolationLevel previousIsolationLevel;
private boolean savepointAllowed = false;
public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) {
this.connectionHolder = connectionHolder;
}
public ConnectionHolder getConnectionHolder() {
Assert.state(this.connectionHolder != null, "No ConnectionHolder available");
return this.connectionHolder;
}
public boolean hasConnectionHolder() {
return (this.connectionHolder != null);
}
public void setPreviousIsolationLevel(@Nullable IsolationLevel previousIsolationLevel) {
this.previousIsolationLevel = previousIsolationLevel;
}
@Nullable
public IsolationLevel getPreviousIsolationLevel() {
return this.previousIsolationLevel;
}
public void setSavepointAllowed(boolean savepointAllowed) {
this.savepointAllowed = savepointAllowed;
}
public boolean isSavepointAllowed() {
return this.savepointAllowed;
}
}
}

View File

@@ -1,66 +0,0 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.connectionfactory;
import io.r2dbc.spi.IsolationLevel;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Convenient base class for R2DBC-aware transaction objects. Can contain a {@link ConnectionHolder} with a R2DBC
* {@link io.r2dbc.spi.Connection}.
*
* @author Mark Paluch
* @see R2dbcTransactionManager
*/
public abstract class R2dbcTransactionObjectSupport {
@Nullable private ConnectionHolder connectionHolder;
@Nullable private IsolationLevel previousIsolationLevel;
private boolean savepointAllowed = false;
public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) {
this.connectionHolder = connectionHolder;
}
public ConnectionHolder getConnectionHolder() {
Assert.state(this.connectionHolder != null, "No ConnectionHolder available");
return this.connectionHolder;
}
public boolean hasConnectionHolder() {
return (this.connectionHolder != null);
}
public void setPreviousIsolationLevel(@Nullable IsolationLevel previousIsolationLevel) {
this.previousIsolationLevel = previousIsolationLevel;
}
@Nullable
public IsolationLevel getPreviousIsolationLevel() {
return this.previousIsolationLevel;
}
public void setSavepointAllowed(boolean savepointAllowed) {
this.savepointAllowed = savepointAllowed;
}
public boolean isSavepointAllowed() {
return this.savepointAllowed;
}
}

View File

@@ -1,87 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.connectionfactory;
import java.util.Stack;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Central delegate that manages transactional resources. To be used by resource management code but not by typical
* application code.
* <p>
* Supports a list of transactional resources if synchronization is active.
* <p>
* Resource management code should check for subscriber {@link reactor.util.context.Context}-bound resources, e.g. R2DBC
* Connections using {@link TransactionResources#getResource(Class)}. Such code is normally not supposed to bind
* resources, as this is the responsibility of transaction managers. A further option is to lazily bind on first use if
* transaction synchronization is active, for performing transactions that span an arbitrary number of resources.
* <p>
* Transaction synchronization must be activated and deactivated by a transaction manager by registering
* {@link ReactiveTransactionSynchronization} in the {@link reactor.util.context.Context subscriber context}.
*
* @author Mark Paluch
*/
public class ReactiveTransactionSynchronization {
private Stack<TransactionResources> resources = new Stack<>();
/**
* Return if transaction synchronization is active for the current {@link reactor.util.context.Context}. Can be called
* before register to avoid unnecessary instance creation.
*/
public boolean isSynchronizationActive() {
return !resources.isEmpty();
}
/**
* Create a new transaction span and register a {@link TransactionResources} instance.
*
* @param transactionResources must not be {@literal null}.
*/
public void registerTransaction(TransactionResources transactionResources) {
Assert.notNull(transactionResources, "TransactionContext must not be null!");
resources.push(transactionResources);
}
/**
* Unregister a transaction span and by removing {@link TransactionResources} instance.
*
* @param transactionResources must not be {@literal null}.
*/
public void unregisterTransaction(TransactionResources transactionResources) {
Assert.notNull(transactionResources, "TransactionContext must not be null!");
resources.remove(transactionResources);
}
/**
* @return obtain the current {@link TransactionResources} or {@literal null} if none is present.
*/
@Nullable
public TransactionResources getCurrentTransaction() {
if (!resources.isEmpty()) {
return resources.peek();
}
return null;
}
}

View File

@@ -1,91 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.connectionfactory;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
/**
* Connection holder, wrapping a R2DBC Connection.
* {@link org.springframework.data.r2dbc.core.TransactionalDatabaseClient} binds instances of this class to the
* {@link TransactionResources} for a specific subscription.
*
* @author Mark Paluch
*/
class SingletonConnectionFactory implements SmartConnectionFactory {
private final ConnectionFactoryMetadata metadata;
private final Connection connection;
private final Mono<Connection> connectionMono;
private final AtomicInteger refCount = new AtomicInteger();
SingletonConnectionFactory(ConnectionFactoryMetadata metadata, Connection connection) {
this.metadata = metadata;
this.connection = connection;
this.connectionMono = Mono.just(connection);
}
/*
* (non-Javadoc)
* @see io.r2dbc.spi.ConnectionFactory#create()
*/
@Override
public Publisher<? extends Connection> create() {
if (refCount.get() == -1) {
throw new IllegalStateException("Connection is closed!");
}
return connectionMono.doOnSubscribe(s -> refCount.incrementAndGet());
}
/*
* (non-Javadoc)
* @see io.r2dbc.spi.ConnectionFactory#getMetadata()
*/
@Override
public ConnectionFactoryMetadata getMetadata() {
return metadata;
}
private boolean connectionEquals(Connection connection) {
return this.connection == connection;
}
/*
* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.connectionfactory.SmartConnectionFactory#shouldClose(io.r2dbc.spi.Connection)
*/
@Override
public boolean shouldClose(Connection connection) {
return refCount.get() == 1;
}
Mono<Void> close(Connection connection) {
if (connectionEquals(connection)) {
return Mono.<Void> empty().doOnSubscribe(s -> refCount.decrementAndGet());
}
throw new IllegalArgumentException("Connection is not associated with this connection factory");
}
}

View File

@@ -15,30 +15,27 @@
*/
package org.springframework.data.r2dbc.connectionfactory;
import static org.springframework.util.ReflectionUtils.*;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Wrapped;
import reactor.core.publisher.Mono;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Wrapped;
import org.springframework.data.r2dbc.core.DatabaseClient;
import org.springframework.lang.Nullable;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
/**
* Proxy for a target R2DBC {@link ConnectionFactory}, adding awareness of Spring-managed transactions.
* <p>
* Data access code that should remain unaware of Spring's data access support can work with this proxy to seamlessly
* participate in Spring-managed transactions. Note that the transaction manager, for example
* {@link R2dbcTransactionManager}, still needs to work with the underlying {@link ConnectionFactory},
* <i>not</i> with this proxy.
* {@link R2dbcTransactionManager}, still needs to work with the underlying {@link ConnectionFactory}, <i>not</i> with
* this proxy.
* <p>
* <b>Make sure that {@link TransactionAwareConnectionFactoryProxy} is the outermost {@link ConnectionFactory} of a
* chain of {@link ConnectionFactory} proxies/adapters.</b> {@link TransactionAwareConnectionFactoryProxy} can delegate
@@ -101,14 +98,15 @@ public class TransactionAwareConnectionFactoryProxy extends DelegatingConnection
* @see ConnectionFactoryUtils#doReleaseConnection
*/
protected Mono<Connection> getTransactionAwareConnectionProxy(ConnectionFactory targetConnectionFactory) {
return ConnectionFactoryUtils.getConnection(targetConnectionFactory).map(TransactionAwareConnectionFactoryProxy::proxyConnection);
return ConnectionFactoryUtils.getConnection(targetConnectionFactory)
.map(it -> proxyConnection(it, targetConnectionFactory));
}
private static Connection proxyConnection(Tuple2<Connection, ConnectionFactory> connectionConnectionFactoryTuple) {
private static Connection proxyConnection(Connection connection, ConnectionFactory targetConnectionFactory) {
return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
new Class<?>[]{ConnectionProxy.class},
new TransactionAwareInvocationHandler(connectionConnectionFactoryTuple.getT1(), connectionConnectionFactoryTuple.getT2()));
new Class<?>[] { ConnectionProxy.class },
new TransactionAwareInvocationHandler(connection, targetConnectionFactory));
}
/**

View File

@@ -1,61 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.connectionfactory;
import reactor.core.publisher.Mono;
import org.springframework.data.r2dbc.core.TransactionalDatabaseClient;
/**
* Transaction context for an ongoing transaction synchronization allowing to register transactional resources.
* <p>
* Supports one resource per key without overwriting, that is, a resource needs to be removed before a new one can be
* set for the same key.
* <p>
* Primarily used by {@link ConnectionFactoryUtils} but can be also used by application code to register resources that
* should be bound to a transaction.
*
* @author Mark Paluch
* @see TransactionalDatabaseClient
*/
public interface TransactionResources {
/**
* Creates a new empty {@link TransactionResources}.
*
* @return the empty {@link TransactionResources}.
*/
static TransactionResources create() {
return new DefaultTransactionResources();
}
/**
* Retrieve a resource from this context identified by {@code key}.
*
* @param key the resource key.
* @return the resource emitted through {@link Mono} or {@link Mono#empty()} if the resource was not found.
*/
<T> T getResource(Class<T> key);
/**
* Register a resource in this context.
*
* @param key the resource key.
* @param value can be a subclass of the {@code key} type.
* @throws IllegalStateException if a resource is already bound under {@code key}.
*/
<T> void registerResource(Class<T> key, T value);
}

View File

@@ -18,7 +18,6 @@ package org.springframework.data.r2dbc.connectionfactory.init;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import org.springframework.dao.DataAccessException;
import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils;
@@ -47,7 +46,7 @@ public abstract class DatabasePopulatorUtils {
Assert.notNull(populator, "DatabasePopulator must not be null");
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
return Mono.usingWhen(ConnectionFactoryUtils.getConnection(connectionFactory).map(Tuple2::getT1), //
return Mono.usingWhen(ConnectionFactoryUtils.getConnection(connectionFactory), //
populator::populate, //
it -> ConnectionFactoryUtils.releaseConnection(it, connectionFactory), //
it -> ConnectionFactoryUtils.releaseConnection(it, connectionFactory))

View File

@@ -51,12 +51,9 @@ public interface DatabaseClient {
* the exchange. The SQL string can contain either native parameter bind markers or named parameters (e.g.
* {@literal :foo, :bar}) when {@link NamedParameterExpander} is enabled.
*
* @see NamedParameterExpander
* @see DatabaseClient.Builder#namedParameters(NamedParameterExpander)
* @param sql must not be {@literal null} or empty.
* @return a new {@link GenericExecuteSpec}.
* @see NamedParameterExpander
* @see DatabaseClient.Builder#namedParameters(NamedParameterExpander)
* @see NamedParameterExpander * @see DatabaseClient.Builder#namedParameters(boolean)
*/
GenericExecuteSpec execute(String sql);
@@ -71,19 +68,11 @@ public interface DatabaseClient {
* @param sqlSupplier must not be {@literal null}.
* @return a new {@link GenericExecuteSpec}.
* @see NamedParameterExpander
* @see DatabaseClient.Builder#namedParameters(NamedParameterExpander)
* @see DatabaseClient.Builder#namedParameters(boolean)
* @see PreparedOperation
*/
GenericExecuteSpec execute(Supplier<String> sqlSupplier);
/**
* Prepare an SQL call returning a result.
*
* @deprecated will be removed with 1.0 M3. Use {@link #execute(String)} directly.
*/
@Deprecated
SqlSpec execute();
/**
* Prepare an SQL SELECT call.
*/
@@ -181,41 +170,6 @@ public interface DatabaseClient {
DatabaseClient build();
}
/**
* Contract for specifying a SQL call along with options leading to the exchange. The SQL string can contain either
* native parameter bind markers (e.g. {@literal $1, $2} for Postgres, {@literal @P0, @P1} for SQL Server) or named
* parameters (e.g. {@literal :foo, :bar}) when {@link NamedParameterExpander} is enabled.
* <p>
* Accepts {@link PreparedOperation} as SQL and binding {@link Supplier}.
* </p>
*
* @see NamedParameterExpander
* @see DatabaseClient.Builder#namedParameters(NamedParameterExpander)
* @deprecated use {@code DatabaseClient.execute(…)} directly.
*/
@Deprecated
interface SqlSpec {
/**
* Specify a static {@code sql} string to execute.
*
* @param sql must not be {@literal null} or empty.
* @return a new {@link GenericExecuteSpec}.
*/
@Deprecated
GenericExecuteSpec sql(String sql);
/**
* Specify a static {@link Supplier SQL supplier} that provides SQL to execute.
*
* @param sqlSupplier must not be {@literal null}.
* @return a new {@link GenericExecuteSpec}.
* @see PreparedOperation
*/
@Deprecated
GenericExecuteSpec sql(Supplier<String> sqlSupplier);
}
/**
* Contract for specifying a SQL call along with options leading to the exchange.
*/

View File

@@ -25,7 +25,6 @@ import io.r2dbc.spi.Statement;
import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
@@ -46,6 +45,7 @@ import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Pageable;
@@ -98,11 +98,6 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor {
return this.builder;
}
@Override
public SqlSpec execute() {
return new DefaultSqlSpec();
}
@Override
public SelectFromSpec select() {
return new DefaultSelectFromSpec();
@@ -201,7 +196,7 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor {
* @return a {@link Mono} able to emit a {@link Connection}.
*/
protected Mono<Connection> getConnection() {
return ConnectionFactoryUtils.getConnection(obtainConnectionFactory()).map(Tuple2::getT1);
return ConnectionFactoryUtils.getConnection(obtainConnectionFactory());
}
/**
@@ -307,27 +302,6 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor {
});
}
/**
* Default {@link DatabaseClient.SqlSpec} implementation.
*/
private class DefaultSqlSpec implements SqlSpec {
@Override
public GenericExecuteSpec sql(String sql) {
Assert.hasText(sql, "SQL must not be null or empty!");
return sql(() -> sql);
}
@Override
public GenericExecuteSpec sql(Supplier<String> sqlSupplier) {
Assert.notNull(sqlSupplier, "SQL Supplier must not be null!");
return createGenericExecuteSpec(sqlSupplier);
}
}
/**
* Base class for {@link DatabaseClient.GenericExecuteSpec} implementations.
*/

View File

@@ -1,150 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.core;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils;
import org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization;
import org.springframework.data.r2dbc.connectionfactory.TransactionResources;
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
import org.springframework.transaction.NoTransactionException;
/**
* Default implementation of a {@link TransactionalDatabaseClient}.
*
* @author Mark Paluch
*/
class DefaultTransactionalDatabaseClient extends DefaultDatabaseClient implements TransactionalDatabaseClient {
DefaultTransactionalDatabaseClient(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator,
ReactiveDataAccessStrategy dataAccessStrategy, boolean namedParameters,
DefaultDatabaseClientBuilder builder) {
super(connector, exceptionTranslator, dataAccessStrategy, namedParameters, builder);
}
@Override
public TransactionalDatabaseClient.Builder mutate() {
return (TransactionalDatabaseClient.Builder) super.mutate();
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#beginTransaction()
*/
@Override
public Mono<Void> beginTransaction() {
Mono<TransactionResources> transactional = ConnectionFactoryUtils.currentReactiveTransactionSynchronization() //
.map(synchronization -> {
TransactionResources transactionResources = TransactionResources.create();
// TODO: This Tx management code creating a TransactionContext. Find a better place.
synchronization.registerTransaction(transactionResources);
return transactionResources;
});
return transactional.flatMap(it -> {
return ConnectionFactoryUtils.doGetConnection(obtainConnectionFactory());
}).flatMap(it -> Mono.from(it.getT1().beginTransaction()));
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#commitTransaction()
*/
@Override
public Mono<Void> commitTransaction() {
return cleanup(Connection::commitTransaction);
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#rollbackTransaction()
*/
@Override
public Mono<Void> rollbackTransaction() {
return cleanup(Connection::rollbackTransaction);
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#inTransaction(java.util.function.Function)
*/
@Override
public <T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> callback) {
return Flux.usingWhen(beginTransaction().thenReturn(this), callback, //
DefaultTransactionalDatabaseClient::commitTransaction, //
DefaultTransactionalDatabaseClient::rollbackTransaction, //
DefaultTransactionalDatabaseClient::rollbackTransaction) //
.subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClient#getConnection()
*/
@Override
protected Mono<Connection> getConnection() {
return ConnectionFactoryUtils.getConnection(obtainConnectionFactory()).map(Tuple2::getT1);
}
/**
* Execute a transactional cleanup. Also, deregister the current {@link TransactionResources synchronization} element.
*/
private static Mono<Void> cleanup(Function<Connection, ? extends Publisher<Void>> callback) {
return ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization() //
.flatMap(synchronization -> {
TransactionResources currentSynchronization = synchronization.getCurrentTransaction();
ConnectionFactory connectionFactory = currentSynchronization.getResource(ConnectionFactory.class);
if (connectionFactory == null) {
throw new NoTransactionException("No ConnectionFactory attached");
}
return Mono.from(connectionFactory.create())
.flatMap(connection -> Mono.from(callback.apply(connection))
.then(ConnectionFactoryUtils.releaseConnection(connection, connectionFactory))
.then(ConnectionFactoryUtils.closeConnection(connection, connectionFactory))) // TODO: Is this rather
// related to
// TransactionContext
// cleanup?
.doFinally(s -> synchronization.unregisterTransaction(currentSynchronization));
});
}
/**
* Potentially register a {@link ReactiveTransactionSynchronization} in the {@link Context} if no synchronization
* object is registered.
*
* @param context the subscriber context.
* @return subscriber context with a registered synchronization.
*/
static Context withTransactionSynchronization(Context context) {
// associate synchronizer object to host transactional resources.
// TODO: Should be moved to a better place.
return context.put(ReactiveTransactionSynchronization.class,
context.getOrDefault(ReactiveTransactionSynchronization.class, new ReactiveTransactionSynchronization()));
}
}

View File

@@ -1,105 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.core;
import io.r2dbc.spi.ConnectionFactory;
import java.util.function.Consumer;
import org.springframework.data.r2dbc.core.DatabaseClient.Builder;
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
import org.springframework.util.Assert;
/**
* @author Mark Paluch
*/
class DefaultTransactionalDatabaseClientBuilder extends DefaultDatabaseClientBuilder
implements TransactionalDatabaseClient.Builder {
DefaultTransactionalDatabaseClientBuilder() {}
DefaultTransactionalDatabaseClientBuilder(DefaultDatabaseClientBuilder other) {
super(other);
Assert.notNull(other, "DefaultDatabaseClientBuilder must not be null!");
}
@Override
public DatabaseClient.Builder clone() {
return new DefaultTransactionalDatabaseClientBuilder(this);
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#connectionFactory(io.r2dbc.spi.ConnectionFactory)
*/
@Override
public TransactionalDatabaseClient.Builder connectionFactory(ConnectionFactory factory) {
super.connectionFactory(factory);
return this;
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#exceptionTranslator(org.springframework.data.r2dbc.support.R2dbcExceptionTranslator)
*/
@Override
public TransactionalDatabaseClient.Builder exceptionTranslator(R2dbcExceptionTranslator exceptionTranslator) {
super.exceptionTranslator(exceptionTranslator);
return this;
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#dataAccessStrategy(org.springframework.data.r2dbc.function.ReactiveDataAccessStrategy)
*/
@Override
public TransactionalDatabaseClient.Builder dataAccessStrategy(ReactiveDataAccessStrategy accessStrategy) {
super.dataAccessStrategy(accessStrategy);
return this;
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#dataAccessStrategy(boolean)
*/
@Override
public TransactionalDatabaseClient.Builder namedParameters(boolean enabled) {
super.namedParameters(enabled);
return this;
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#apply(java.util.function.Consumer)
*/
@Override
public TransactionalDatabaseClient.Builder apply(Consumer<Builder> builderConsumer) {
super.apply(builderConsumer);
return this;
}
/* (non-Javadoc)
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#build()
*/
@Override
public TransactionalDatabaseClient build() {
return (TransactionalDatabaseClient) super.build();
}
@Override
protected DatabaseClient doBuild(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator,
ReactiveDataAccessStrategy accessStrategy, boolean namedParameters,
DefaultDatabaseClientBuilder builder) {
return new DefaultTransactionalDatabaseClient(connector, exceptionTranslator, accessStrategy, namedParameters,
builder);
}
}

View File

@@ -1,217 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.core;
import io.r2dbc.spi.ConnectionFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.r2dbc.connectionfactory.TransactionResources;
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.util.Assert;
/**
* {@link DatabaseClient} that participates in an ongoing transaction if the subscription happens within a hosted
* transaction. Alternatively, transactions can be started and cleaned up using {@link #beginTransaction()} and
* {@link #commitTransaction()}.
* <p>
* Transactional resources are bound to
* {@link org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization} through nested
* {@link TransactionContext} enabling nested (parallel) transactions. The simplemost approach to use transactions is by
* using {@link #inTransaction(Function)} which will start a transaction and commit it on successful termination. The
* callback allows execution of multiple statements within the same transaction.
*
* <pre class="code">
* Flux<Integer> transactionalFlux = databaseClient.inTransaction(db -> {
*
* return db.execute("INSERT INTO person (id, firstname, lastname) VALUES(:id, :firstname, :lastname)") //
* .bind("id", 1) //
* .bind("firstname", "Walter") //
* .bind("lastname", "White") //
* .fetch().rowsUpdated();
* });
* </pre>
*
* Alternatively, transactions can be controlled by using {@link #beginTransaction()} and {@link #commitTransaction()}
* methods. This approach requires {@link #enableTransactionSynchronization(Publisher) enabling of transaction
* synchronization} for the transactional operation.
*
* <pre class="code">
* Mono<Void> mono = databaseClient.beginTransaction()
* .then(databaseClient.execute()
* .execute("INSERT INTO person (id, firstname, lastname) VALUES(:id, :firstname, :lastname)") //
* .bind("id", 1) //
* .bind("firstname", "Walter") //
* .bind("lastname", "White") //
* .fetch().rowsUpdated())
* .then(databaseClient.commitTransaction());
*
* Mono<Void> transactionalMono = databaseClient.enableTransactionSynchronization(mono);
* </pre>
* <p>
* This {@link DatabaseClient} can be safely used without transaction synchronization to invoke database functionality
* in auto-commit transactions.
*
* @author Mark Paluch
* @see #inTransaction(Function)
* @see #enableTransactionSynchronization(Publisher)
* @see #beginTransaction()
* @see #commitTransaction()
* @see #rollbackTransaction()
* @see org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization
* @see TransactionResources
* @see org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils
* @deprecated Use {@link DatabaseClient} in combination with {@link TransactionalOperator}.
*/
@Deprecated
public interface TransactionalDatabaseClient extends DatabaseClient {
/**
* Start a transaction and bind connection resources to the subscriber context.
*
* @return
*/
Mono<Void> beginTransaction();
/**
* Commit a transaction and unbind connection resources from the subscriber context.
*
* @return
* @throws org.springframework.transaction.NoTransactionException if no transaction is ongoing.
*/
Mono<Void> commitTransaction();
/**
* Rollback a transaction and unbind connection resources from the subscriber context.
*
* @return
* @throws org.springframework.transaction.NoTransactionException if no transaction is ongoing.
*/
Mono<Void> rollbackTransaction();
/**
* Execute a {@link Function} accepting a {@link DatabaseClient} within a managed transaction. {@link Exception Error
* signals} cause the transaction to be rolled back.
*
* @param callback
* @return the callback result.
*/
<T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> callback);
/**
* Enable transaction management so that connections can be bound to the subscription.
*
* @param publisher must not be {@literal null}.
* @return the Transaction-enabled {@link Mono}.
*/
default <T> Mono<T> enableTransactionSynchronization(Mono<T> publisher) {
Assert.notNull(publisher, "Publisher must not be null!");
return publisher.subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
}
/**
* Enable transaction management so that connections can be bound to the subscription.
*
* @param publisher must not be {@literal null}.
* @return the Transaction-enabled {@link Flux}.
*/
default <T> Flux<T> enableTransactionSynchronization(Publisher<T> publisher) {
Assert.notNull(publisher, "Publisher must not be null!");
return Flux.from(publisher).subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
}
/**
* Return a builder to mutate properties of this database client.
*/
TransactionalDatabaseClient.Builder mutate();
// Static, factory methods
/**
* A variant of {@link #create(ConnectionFactory)} that accepts a {@link io.r2dbc.spi.ConnectionFactory}.
*/
static TransactionalDatabaseClient create(ConnectionFactory factory) {
return (TransactionalDatabaseClient) new DefaultTransactionalDatabaseClientBuilder().connectionFactory(factory)
.build();
}
/**
* Obtain a {@code DatabaseClient} builder.
*/
static TransactionalDatabaseClient.Builder builder() {
return new DefaultTransactionalDatabaseClientBuilder();
}
/**
* A mutable builder for creating a {@link TransactionalDatabaseClient}.
*/
interface Builder extends DatabaseClient.Builder {
/**
* Configures the {@link ConnectionFactory R2DBC connector}.
*
* @param factory must not be {@literal null}.
* @return {@code this} {@link Builder}.
*/
Builder connectionFactory(ConnectionFactory factory);
/**
* Configures a {@link R2dbcExceptionTranslator}.
*
* @param exceptionTranslator must not be {@literal null}.
* @return {@code this} {@link Builder}.
*/
Builder exceptionTranslator(R2dbcExceptionTranslator exceptionTranslator);
/**
* Configures a {@link ReactiveDataAccessStrategy}.
*
* @param accessStrategy must not be {@literal null}.
* @return {@code this} {@link Builder}.
*/
Builder dataAccessStrategy(ReactiveDataAccessStrategy accessStrategy);
/**
* Configures whether to use named parameter expansion. Defaults to {@literal true}.
*
* @param enabled {@literal true} to use named parameter expansion. {@literal false} to disable named parameter
* expansion.
* @return {@code this} {@link DatabaseClient.Builder}.
* @see NamedParameterExpander
*/
Builder namedParameters(boolean enabled);
/**
* Configures a {@link Consumer} to configure this builder.
*
* @param builderConsumer must not be {@literal null}.
* @return {@code this} {@link Builder}.
*/
Builder apply(Consumer<DatabaseClient.Builder> builderConsumer);
@Override
TransactionalDatabaseClient build();
}
}

View File

@@ -70,7 +70,7 @@ public class H2IntegrationTests {
jdbc.execute("INSERT INTO legoset (id, name, manual) VALUES(42055, 'SCHAUFELRADBAGGER', 12)");
databaseClient.execute().sql("SELECT COUNT(*) FROM legoset") //
databaseClient.execute("SELECT COUNT(*) FROM legoset") //
.as(Long.class) //
.fetch() //
.all() //

View File

@@ -1,117 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.r2dbc.connectionfactory;
import static org.mockito.Mockito.*;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils;
import org.springframework.data.r2dbc.connectionfactory.ReactiveTransactionSynchronization;
import org.springframework.data.r2dbc.connectionfactory.TransactionResources;
import org.springframework.transaction.NoTransactionException;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
* Unit tests for {@link ConnectionFactoryUtils}.
*
* @author Mark Paluch
* @author Christoph Strobl
*/
public class ConnectionFactoryUtilsUnitTests {
@Test // gh-107
public void currentReactiveTransactionSynchronizationShouldReportSynchronization() {
ConnectionFactoryUtils.currentReactiveTransactionSynchronization() //
.subscriberContext(
it -> it.put(ReactiveTransactionSynchronization.class, new ReactiveTransactionSynchronization()))
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
}
@Test // gh-107
public void currentReactiveTransactionSynchronizationShouldFailWithoutTxMgmt() {
ConnectionFactoryUtils.currentReactiveTransactionSynchronization() //
.as(StepVerifier::create) //
.expectError(NoTransactionException.class) //
.verify();
}
@Test // gh-107
public void currentActiveReactiveTransactionSynchronizationShouldReportSynchronization() {
ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization() //
.subscriberContext(it -> {
ReactiveTransactionSynchronization sync = new ReactiveTransactionSynchronization();
sync.registerTransaction(TransactionResources.create());
return it.put(ReactiveTransactionSynchronization.class, sync);
}).as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
}
@Test // gh-107
public void currentActiveReactiveTransactionSynchronization() {
ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization() //
.subscriberContext(
it -> it.put(ReactiveTransactionSynchronization.class, new ReactiveTransactionSynchronization()))
.as(StepVerifier::create) //
.expectError(NoTransactionException.class) //
.verify();
}
@Test // gh-107
public void currentConnectionFactoryShouldReportConnectionFactory() {
ConnectionFactory factoryMock = mock(ConnectionFactory.class);
ConnectionFactoryUtils.currentConnectionFactory(factoryMock) //
.subscriberContext(it -> {
ReactiveTransactionSynchronization sync = new ReactiveTransactionSynchronization();
TransactionResources resources = TransactionResources.create();
resources.registerResource(ConnectionFactory.class, factoryMock);
sync.registerTransaction(resources);
return it.put(ReactiveTransactionSynchronization.class, sync);
}).as(StepVerifier::create) //
.expectNext(factoryMock) //
.verifyComplete();
}
@Test // gh-107
public void connectionFactoryRetunsConnectionWhenNoSyncronisationActive() {
ConnectionFactory factoryMock = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Publisher<? extends Connection> p = Mono.just(connection);
doReturn(p).when(factoryMock).create();
ConnectionFactoryUtils.getConnection(factoryMock) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
Assertions.assertThat(it.getT1()).isEqualTo(connection);
Assertions.assertThat(it.getT2()).isEqualTo(factoryMock);
})
.verifyComplete();
}
}

View File

@@ -71,7 +71,7 @@ public class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1).flatMap(it -> {
ConnectionFactoryUtils.getConnection(connectionFactoryMock).flatMap(it -> {
return TransactionSynchronizationManager.forCurrentTransaction()
.doOnNext(synchronizationManager -> synchronizationManager.registerSynchronization(sync));
@@ -153,7 +153,7 @@ public class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1) //
ConnectionFactoryUtils.getConnection(connectionFactoryMock) //
.doOnNext(it -> {
it.createStatement("foo");
}).then() //
@@ -179,7 +179,7 @@ public class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator.create(tm);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1).doOnNext(it -> {
ConnectionFactoryUtils.getConnection(connectionFactoryMock).doOnNext(it -> {
throw new IllegalStateException();

View File

@@ -18,20 +18,17 @@ package org.springframework.data.r2dbc.connectionfactory;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.util.concurrent.atomic.AtomicReference;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.r2dbc.connectionfactory.R2dbcTransactionManager;
import org.springframework.data.r2dbc.connectionfactory.ConnectionFactoryUtils;
import org.springframework.data.r2dbc.connectionfactory.ConnectionProxy;
import org.springframework.data.r2dbc.connectionfactory.TransactionAwareConnectionFactoryProxy;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import org.springframework.transaction.reactive.TransactionalOperator;
/**
* Unit tests for {@link TransactionAwareConnectionFactoryProxy}.
@@ -63,20 +60,17 @@ public class TransactionAwareConnectionFactoryProxyUnitTests {
.as(StepVerifier::create) //
.consumeNextWith(connection -> {
assertThat(connection).isInstanceOf(ConnectionProxy.class);
})
.verifyComplete();
}).verifyComplete();
}
@Test // gh-107
public void unwrapShouldReturnTargetConnection() {
new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() //
.map(ConnectionProxy.class::cast)
.as(StepVerifier::create) //
.map(ConnectionProxy.class::cast).as(StepVerifier::create) //
.consumeNextWith(proxy -> {
assertThat(proxy.unwrap()).isEqualTo(connectionMock1);
})
.verifyComplete();
}).verifyComplete();
}
@Test // gh-107
@@ -85,25 +79,21 @@ public class TransactionAwareConnectionFactoryProxyUnitTests {
when(connectionMock1.close()).thenReturn(Mono.empty());
new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() //
.map(ConnectionProxy.class::cast)
.flatMap(it -> Mono.from(it.close()).then(Mono.just(it)))
.map(ConnectionProxy.class::cast).flatMap(it -> Mono.from(it.close()).then(Mono.just(it)))
.as(StepVerifier::create) //
.consumeNextWith(proxy -> {
assertThat(proxy.unwrap()).isEqualTo(connectionMock1);
})
.verifyComplete();
}).verifyComplete();
}
@Test // gh-107
public void getTargetConnectionShouldReturnTargetConnection() {
new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() //
.map(ConnectionProxy.class::cast)
.as(StepVerifier::create) //
.map(ConnectionProxy.class::cast).as(StepVerifier::create) //
.consumeNextWith(proxy -> {
assertThat(proxy.getTargetConnection()).isEqualTo(connectionMock1);
})
.verifyComplete();
}).verifyComplete();
}
@Test // gh-107
@@ -112,38 +102,32 @@ public class TransactionAwareConnectionFactoryProxyUnitTests {
when(connectionMock1.close()).thenReturn(Mono.empty());
new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() //
.map(ConnectionProxy.class::cast)
.flatMap(it -> Mono.from(it.close()).then(Mono.just(it)))
.map(ConnectionProxy.class::cast).flatMap(it -> Mono.from(it.close()).then(Mono.just(it)))
.as(StepVerifier::create) //
.consumeNextWith(proxy -> {
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> proxy.getTargetConnection());
})
.verifyComplete();
}).verifyComplete();
}
@Test // gh-107
public void hashCodeShouldReturnProxyHash() {
new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() //
.map(ConnectionProxy.class::cast)
.as(StepVerifier::create) //
.map(ConnectionProxy.class::cast).as(StepVerifier::create) //
.consumeNextWith(proxy -> {
assertThat(proxy.hashCode()).isEqualTo(System.identityHashCode(proxy));
})
.verifyComplete();
}).verifyComplete();
}
@Test // gh-107
public void equalsShouldCompareCorrectly() {
new TransactionAwareConnectionFactoryProxy(connectionFactoryMock).create() //
.map(ConnectionProxy.class::cast)
.as(StepVerifier::create) //
.map(ConnectionProxy.class::cast).as(StepVerifier::create) //
.consumeNextWith(proxy -> {
assertThat(proxy.equals(proxy)).isTrue();
assertThat(proxy.equals(connectionMock1)).isFalse();
})
.verifyComplete();
}).verifyComplete();
}
@Test // gh-107
@@ -158,16 +142,16 @@ public class TransactionAwareConnectionFactoryProxyUnitTests {
TransactionAwareConnectionFactoryProxy proxyCf = new TransactionAwareConnectionFactoryProxy(connectionFactoryMock);
ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1) //
ConnectionFactoryUtils.getConnection(connectionFactoryMock) //
.doOnNext(transactionalConnection::set).flatMap(it -> {
return proxyCf.create().doOnNext(connectionFromProxy -> {
return proxyCf.create().doOnNext(connectionFromProxy -> {
ConnectionProxy connectionProxy = (ConnectionProxy) connectionFromProxy;
assertThat(connectionProxy.getTargetConnection()).isSameAs(it);
assertThat(connectionProxy.unwrap()).isSameAs(it);
});
}).as(rxtx::transactional) //
ConnectionProxy connectionProxy = (ConnectionProxy) connectionFromProxy;
assertThat(connectionProxy.getTargetConnection()).isSameAs(it);
assertThat(connectionProxy.unwrap()).isSameAs(it);
});
}).as(rxtx::transactional) //
.flatMapMany(Connection::close) //
.as(StepVerifier::create) //
.verifyComplete();

View File

@@ -347,7 +347,7 @@ public abstract class AbstractDatabaseClientIntegrationTests extends R2dbcIntegr
DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
databaseClient.execute().sql("SELECT COUNT(*) FROM legoset") //
databaseClient.execute("SELECT COUNT(*) FROM legoset") //
.as(Long.class) //
.fetch() //
.all() //
@@ -355,7 +355,7 @@ public abstract class AbstractDatabaseClientIntegrationTests extends R2dbcIntegr
.expectNext(1L) //
.verifyComplete();
databaseClient.execute().sql("SELECT name FROM legoset") //
databaseClient.execute("SELECT name FROM legoset") //
.as(String.class) //
.fetch() //
.one() //

View File

@@ -17,17 +17,18 @@ package org.springframework.data.r2dbc.core;
import static org.assertj.core.api.Assertions.*;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import io.r2dbc.spi.ConnectionFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import javax.sql.DataSource;
import org.assertj.core.api.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
@@ -36,22 +37,16 @@ import org.springframework.context.support.GenericApplicationContext;
import org.springframework.dao.DataAccessException;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.connectionfactory.R2dbcTransactionManager;
import org.springframework.data.r2dbc.core.DatabaseClient;
import org.springframework.data.r2dbc.core.TransactionalDatabaseClient;
import org.springframework.data.r2dbc.testing.R2dbcIntegrationTestSupport;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.NoTransactionException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
* Abstract base class for integration tests for {@link TransactionalDatabaseClient}.
* Abstract base class for transactional integration tests for {@link DatabaseClient}.
*
* @author Mark Paluch
* @author Christoph Strobl
@@ -65,6 +60,10 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
AnnotationConfigApplicationContext context;
TransactionalService service;
DatabaseClient databaseClient;
R2dbcTransactionManager transactionManager;
TransactionalOperator rxtx;
@Before
public void before() {
@@ -80,10 +79,13 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
jdbc = createJdbcTemplate(createDataSource());
try {
jdbc.execute("DROP TABLE legoset");
} catch (DataAccessException e) {
}
} catch (DataAccessException e) {}
jdbc.execute(getCreateTableStatement());
jdbc.execute("DELETE FROM legoset");
databaseClient = DatabaseClient.create(connectionFactory);
transactionManager = new R2dbcTransactionManager(connectionFactory);
rxtx = TransactionalOperator.create(transactionManager);
}
@After
@@ -143,15 +145,12 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
@Test // gh-2
public void executeInsertInManagedTransaction() {
TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory);
Flux<Integer> integerFlux = databaseClient.inTransaction(db -> db //
Mono<Integer> integerFlux = databaseClient //
.execute(getInsertIntoLegosetStatement()) //
.bind(0, 42055) //
.bind(1, "SCHAUFELRADBAGGER") //
.bindNull(2, Integer.class) //
.fetch().rowsUpdated() //
);
.fetch().rowsUpdated().as(rxtx::transactional);
integerFlux.as(StepVerifier::create) //
.expectNext(1) //
@@ -163,13 +162,11 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
@Test // gh-2
public void executeInsertInAutoCommitTransaction() {
TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory);
Mono<Integer> integerFlux = databaseClient.execute(getInsertIntoLegosetStatement()) //
.bind(0, 42055) //
.bind(1, "SCHAUFELRADBAGGER") //
.bindNull(2, Integer.class) //
.fetch().rowsUpdated();
.fetch().rowsUpdated().as(rxtx::transactional);
integerFlux.as(StepVerifier::create) //
.expectNext(1) //
@@ -178,58 +175,15 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
assertThat(jdbc.queryForMap("SELECT id, name, manual FROM legoset")).hasEntrySatisfying("id", numberOf(42055));
}
@Test // gh-2
public void shouldManageUserTransaction() {
Queue<Long> transactionIds = new ArrayBlockingQueue<>(5);
TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory);
Flux<Long> txId = databaseClient //
.execute(getCurrentTransactionIdStatement()) //
.map((r, md) -> r.get(0, Long.class)) //
.all();
Mono<Void> then = databaseClient.enableTransactionSynchronization(databaseClient.beginTransaction() //
.thenMany(txId.concatWith(txId).doOnNext(transactionIds::add)) //
.then(databaseClient.rollbackTransaction()));
then.as(StepVerifier::create) //
.verifyComplete();
List<Long> listOfTxIds = new ArrayList<>(transactionIds);
assertThat(listOfTxIds).hasSize(2);
assertThat(listOfTxIds).containsExactly(listOfTxIds.get(1), listOfTxIds.get(0));
}
@Test // gh-2
public void userTransactionManagementShouldFailWithoutSynchronizer() {
TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory);
Mono<Void> then = databaseClient.beginTransaction().then(databaseClient.rollbackTransaction());
then.as(StepVerifier::create) //
.consumeErrorWith(exception -> {
assertThat(exception).isInstanceOf(NoTransactionException.class)
.hasMessageContaining("Transaction management is not enabled");
}).verify();
}
@Test // gh-2
public void shouldRollbackTransaction() {
TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory);
Flux<Integer> integerFlux = databaseClient.inTransaction(db -> {
return db.execute(getInsertIntoLegosetStatement()) //
.bind(0, 42055) //
.bind(1, "SCHAUFELRADBAGGER") //
.bindNull(2, Integer.class) //
.fetch().rowsUpdated() //
.then(Mono.error(new IllegalStateException("failed")));
});
Mono<Object> integerFlux = databaseClient.execute(getInsertIntoLegosetStatement()) //
.bind(0, 42055) //
.bind(1, "SCHAUFELRADBAGGER") //
.bindNull(2, Integer.class) //
.fetch().rowsUpdated() //
.then(Mono.error(new IllegalStateException("failed"))).as(rxtx::transactional);
integerFlux.as(StepVerifier::create) //
.expectError(IllegalStateException.class) //
@@ -242,17 +196,12 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
@Test // gh-2, gh-75, gh-107
public void emitTransactionIds() {
DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
TransactionalOperator transactionalOperator = TransactionalOperator
.create(new R2dbcTransactionManager(connectionFactory), new DefaultTransactionDefinition());
Flux<Object> txId = databaseClient.execute(getCurrentTransactionIdStatement()) //
.map((row, md) -> row.get(0)) //
.all();
Flux<Object> transactionIds = prepareForTransaction(databaseClient).thenMany(txId.concatWith(txId)) //
.as(transactionalOperator::transactional);
.as(rxtx::transactional);
transactionIds.collectList().as(StepVerifier::create) //
.consumeNextWith(actual -> {
@@ -271,8 +220,7 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
TransactionalOperator transactionalOperator = TransactionalOperator
.create(new R2dbcTransactionManager(connectionFactory), new DefaultTransactionDefinition());
Flux<Integer> integerFlux = databaseClient.execute() //
.sql(getInsertIntoLegosetStatement()) //
Flux<Integer> integerFlux = databaseClient.execute(getInsertIntoLegosetStatement()) //
.bind(0, 42055) //
.bind(1, "SCHAUFELRADBAGGER") //
.bindNull(2, Integer.class) //
@@ -290,10 +238,11 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
assertThat(count).isEqualTo(0);
}
@Test //gh-107
@Test // gh-107
public void emitTransactionIdsUsingManagedTransactions() {
service.emitTransactionIds(prepareForTransaction(service.getDatabaseClient()), getCurrentTransactionIdStatement()).collectList().as(StepVerifier::create) //
service.emitTransactionIds(prepareForTransaction(service.getDatabaseClient()), getCurrentTransactionIdStatement())
.collectList().as(StepVerifier::create) //
.consumeNextWith(actual -> {
assertThat(actual).hasSize(2);
@@ -352,19 +301,17 @@ public abstract class AbstractTransactionalDatabaseClientIntegrationTests extend
@Transactional
public Flux<Object> emitTransactionIds(Mono<Void> prepareTransaction, String idStatement) {
Flux<Object> txId = databaseClient.execute() //
.sql(idStatement) //
Flux<Object> txId = databaseClient.execute(idStatement) //
.map((row, md) -> row.get(0)) //
.all();
return prepareTransaction.thenMany(txId.concatWith(txId));
}
@Transactional
public Flux<Integer> shouldRollbackTransactionUsingTransactionalOperator(String insertStatement) {
return databaseClient.execute().sql(insertStatement) //
return databaseClient.execute(insertStatement) //
.bind(0, 42055) //
.bind(1, "SCHAUFELRADBAGGER") //
.bindNull(2, Integer.class) //

View File

@@ -22,14 +22,13 @@ import io.r2dbc.spi.ConnectionFactory;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.data.r2dbc.core.DatabaseClient;
import org.springframework.data.r2dbc.core.TransactionalDatabaseClient;
import org.springframework.data.r2dbc.testing.ExternalDatabase;
import org.springframework.data.r2dbc.testing.MySqlTestSupport;
import reactor.core.publisher.Mono;
/**
* Integration tests for {@link TransactionalDatabaseClient} against MySQL.
* Transactional integration tests for {@link DatabaseClient} against MySQL.
*
* @author Mark Paluch
*/
@@ -63,7 +62,7 @@ public class MySqlTransactionalDatabaseClientIntegrationTests
* batches every now and then.
* @see: https://dev.mysql.com/doc/refman/5.7/en/innodb-information-schema-internal-data.html
*/
return client.execute().sql(getInsertIntoLegosetStatement()) //
return client.execute(getInsertIntoLegosetStatement()) //
.bind(0, 42055) //
.bind(1, "SCHAUFELRADBAGGER") //
.bindNull(2, Integer.class) //
@@ -76,10 +75,4 @@ public class MySqlTransactionalDatabaseClientIntegrationTests
protected String getCurrentTransactionIdStatement() {
return "SELECT tx.trx_id FROM information_schema.innodb_trx tx WHERE tx.trx_mysql_thread_id = connection_id()";
}
@Override
@Test
@Ignore("MySQL creates transactions only on interaction with transactional tables. BEGIN does not create a txid")
public void shouldManageUserTransaction() {
}
}

View File

@@ -5,12 +5,12 @@ import io.r2dbc.spi.ConnectionFactory;
import javax.sql.DataSource;
import org.junit.ClassRule;
import org.springframework.data.r2dbc.core.TransactionalDatabaseClient;
import org.springframework.data.r2dbc.testing.ExternalDatabase;
import org.springframework.data.r2dbc.testing.PostgresTestSupport;
/**
* Integration tests for {@link TransactionalDatabaseClient} against PostgreSQL.
* Transactional integration tests for {@link DatabaseClient} against PostgreSQL.
*
* @author Mark Paluch
*/

View File

@@ -5,12 +5,12 @@ import io.r2dbc.spi.ConnectionFactory;
import javax.sql.DataSource;
import org.junit.ClassRule;
import org.springframework.data.r2dbc.core.TransactionalDatabaseClient;
import org.springframework.data.r2dbc.testing.ExternalDatabase;
import org.springframework.data.r2dbc.testing.SqlServerTestSupport;
/**
* Integration tests for {@link TransactionalDatabaseClient} against Microsoft SQL Server.
* Transactional integration tests for {@link DatabaseClient} against Microsoft SQL Server.
*
* @author Mark Paluch
*/

View File

@@ -40,9 +40,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.PersistenceConstructor;
import org.springframework.data.r2dbc.connectionfactory.R2dbcTransactionManager;
import org.springframework.data.r2dbc.convert.MappingR2dbcConverter;
import org.springframework.data.r2dbc.core.DefaultReactiveDataAccessStrategy;
import org.springframework.data.r2dbc.core.TransactionalDatabaseClient;
import org.springframework.data.r2dbc.dialect.DialectResolver;
import org.springframework.data.r2dbc.dialect.R2dbcDialect;
import org.springframework.data.r2dbc.repository.support.R2dbcRepositoryFactory;
@@ -52,6 +52,7 @@ import org.springframework.data.relational.core.mapping.Table;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.reactive.TransactionalOperator;
/**
* Abstract base class for integration tests for {@link LegoSetRepository} using {@link R2dbcRepositoryFactory}.
@@ -60,9 +61,8 @@ import org.springframework.jdbc.core.JdbcTemplate;
*/
public abstract class AbstractR2dbcRepositoryIntegrationTests extends R2dbcIntegrationTestSupport {
private static RelationalMappingContext mappingContext = new RelationalMappingContext();
@Autowired private LegoSetRepository repository;
@Autowired private ConnectionFactory connectionFactory;
private JdbcTemplate jdbc;
@Before
@@ -174,25 +174,18 @@ public abstract class AbstractR2dbcRepositoryIntegrationTests extends R2dbcInteg
@Test
public void shouldInsertItemsTransactional() {
R2dbcDialect dialect = DialectResolver.getDialect(createConnectionFactory());
DefaultReactiveDataAccessStrategy dataAccessStrategy = new DefaultReactiveDataAccessStrategy(dialect,
new MappingR2dbcConverter(mappingContext));
TransactionalDatabaseClient client = TransactionalDatabaseClient.builder()
.connectionFactory(createConnectionFactory()).dataAccessStrategy(dataAccessStrategy).build();
LegoSetRepository transactionalRepository = new R2dbcRepositoryFactory(client, dataAccessStrategy)
.getRepository(getRepositoryInterfaceType());
R2dbcTransactionManager r2dbcTransactionManager = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator rxtx = TransactionalOperator.create(r2dbcTransactionManager);
LegoSet legoSet1 = new LegoSet(null, "SCHAUFELRADBAGGER", 12);
LegoSet legoSet2 = new LegoSet(null, "FORSCHUNGSSCHIFF", 13);
Flux<Map<String, Object>> transactional = client.inTransaction(db -> {
Mono<Map<String, Object>> transactional = repository.save(legoSet1) //
.map(it -> jdbc.queryForMap("SELECT count(*) AS count FROM legoset")).as(rxtx::transactional);
return transactionalRepository.save(legoSet1) //
.map(it -> jdbc.queryForMap("SELECT count(*) AS count FROM legoset"));
});
Mono<Map<String, Object>> nonTransactional = transactionalRepository.save(legoSet2) //
Mono<Map<String, Object>> nonTransactional = repository.save(legoSet2) //
.map(it -> jdbc.queryForMap("SELECT count(*) AS count FROM legoset"));
transactional.as(StepVerifier::create).expectNext(Collections.singletonMap("count", 0L)).verifyComplete();

View File

@@ -23,6 +23,7 @@ import javax.sql.DataSource;
import org.junit.runner.RunWith;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
@@ -48,6 +49,7 @@ public class H2R2dbcRepositoryIntegrationTests extends AbstractR2dbcRepositoryIn
includeFilters = @Filter(classes = H2LegoSetRepository.class, type = FilterType.ASSIGNABLE_TYPE))
static class IntegrationTestConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return H2TestSupport.createConnectionFactory();

View File

@@ -24,6 +24,7 @@ import javax.sql.DataSource;
import org.junit.ClassRule;
import org.junit.runner.RunWith;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
@@ -52,6 +53,7 @@ public class MySqlR2dbcRepositoryIntegrationTests extends AbstractR2dbcRepositor
includeFilters = @Filter(classes = MySqlLegoSetRepository.class, type = FilterType.ASSIGNABLE_TYPE))
static class IntegrationTestConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return MySqlTestSupport.createConnectionFactory(database);

View File

@@ -24,6 +24,7 @@ import javax.sql.DataSource;
import org.junit.ClassRule;
import org.junit.runner.RunWith;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
@@ -52,6 +53,7 @@ public class PostgresR2dbcRepositoryIntegrationTests extends AbstractR2dbcReposi
includeFilters = @Filter(classes = PostgresLegoSetRepository.class, type = FilterType.ASSIGNABLE_TYPE))
static class IntegrationTestConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return PostgresTestSupport.createConnectionFactory(database);

View File

@@ -25,6 +25,7 @@ import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
@@ -53,6 +54,7 @@ public class SqlServerR2dbcRepositoryIntegrationTests extends AbstractR2dbcRepos
includeFilters = @Filter(classes = SqlServerLegoSetRepository.class, type = FilterType.ASSIGNABLE_TYPE))
static class IntegrationTestConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return SqlServerTestSupport.createConnectionFactory(database);