diff --git a/src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java b/src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java index 5f557b5..fa6e6c3 100644 --- a/src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java +++ b/src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java @@ -386,7 +386,7 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor { return statement; }; - Function> resultFunction = it -> Flux.from(executeFunction.apply(it).execute()); + Function> resultFunction = toExecuteFunction(sql, executeFunction); return new DefaultSqlResult<>(DefaultDatabaseClient.this, // sql, // @@ -707,7 +707,7 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor { String sql = getRequiredSql(preparedOperation); Function selectFunction = wrapPreparedOperation(sql, preparedOperation); - Function> resultFunction = it -> Flux.from(selectFunction.apply(it).execute()); + Function> resultFunction = DefaultDatabaseClient.toExecuteFunction(sql, selectFunction); return new DefaultSqlResult<>(DefaultDatabaseClient.this, // sql, // @@ -1377,7 +1377,7 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor { String sql = getRequiredSql(operation); Function insertFunction = wrapPreparedOperation(sql, operation) .andThen(statement -> statement.returnGeneratedValues()); - Function> resultFunction = it -> Flux.from(insertFunction.apply(it).execute()); + Function> resultFunction = toExecuteFunction(sql, insertFunction); return new DefaultSqlResult<>(this, // sql, // @@ -1390,7 +1390,7 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor { String sql = getRequiredSql(operation); Function executeFunction = wrapPreparedOperation(sql, operation); - Function> resultFunction = it -> Flux.from(executeFunction.apply(it).execute()); + Function> resultFunction = toExecuteFunction(sql, executeFunction); return new DefaultSqlResult<>(this, // sql, // @@ -1421,6 +1421,16 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor { }; } + private static Function> toExecuteFunction(String sql, + Function executeFunction) { + + return it -> { + + Flux from = Flux.defer(() -> executeFunction.apply(it).execute()).cast(Result.class); + return from.checkpoint("SQL \"" + sql + "\" [DatabaseClient]"); + }; + } + private static Flux doInConnectionMany(Connection connection, Function> action) { try {