diff --git a/src/database/connection.rs b/src/database/connection.rs index 4919ecb25..b6a3c1652 100644 --- a/src/database/connection.rs +++ b/src/database/connection.rs @@ -47,6 +47,48 @@ pub trait StreamTrait: Send + Sync { ) -> Pin, DbErr>> + 'a + Send>>; } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +/// Isolation level +pub enum IsolationLevel { + /// Consistent reads within the same transaction read the snapshot established by the first read. + RepeatableRead, + /// Each consistent read, even within the same transaction, sets and reads its own fresh snapshot. + ReadCommitted, + /// SELECT statements are performed in a nonlocking fashion, but a possible earlier version of a row might be used. + ReadUncommitted, + /// All statements of the current transaction can only see rows committed before the first query or data-modification statement was executed in this transaction. + Serializable, +} + +impl std::fmt::Display for IsolationLevel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IsolationLevel::RepeatableRead => write!(f, "REPEATABLE READ"), + IsolationLevel::ReadCommitted => write!(f, "READ COMMITTED"), + IsolationLevel::ReadUncommitted => write!(f, "READ UNCOMMITTED"), + IsolationLevel::Serializable => write!(f, "SERIALIZABLE"), + } + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +/// Access mode +pub enum AccessMode { + /// Data can't be modified in this transaction + ReadOnly, + /// Data can be modified in this transaction (default) + ReadWrite, +} + +impl std::fmt::Display for AccessMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AccessMode::ReadOnly => write!(f, "READ ONLY"), + AccessMode::ReadWrite => write!(f, "READ WRITE"), + } + } +} + /// Spawn database transaction #[async_trait::async_trait] pub trait TransactionTrait { @@ -54,6 +96,14 @@ pub trait TransactionTrait { /// Returns a Transaction that can be committed or rolled back async fn begin(&self) -> Result; + /// Execute SQL `BEGIN` transaction with isolation level and/or access mode. + /// Returns a Transaction that can be committed or rolled back + async fn begin_with_config( + &self, + isolation_level: Option, + access_mode: Option, + ) -> Result; + /// Execute the function inside a transaction. /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. async fn transaction(&self, callback: F) -> Result> @@ -64,4 +114,20 @@ pub trait TransactionTrait { + Send, T: Send, E: std::error::Error + Send; + + /// Execute the function inside a transaction with isolation level and/or access mode. + /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. + async fn transaction_with_config( + &self, + callback: F, + isolation_level: Option, + access_mode: Option, + ) -> Result> + where + F: for<'c> FnOnce( + &'c DatabaseTransaction, + ) -> Pin> + Send + 'c>> + + Send, + T: Send, + E: std::error::Error + Send; } diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index 7fd51633b..9e40fb88e 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -1,6 +1,6 @@ use crate::{ - error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement, - StatementBuilder, StreamTrait, TransactionError, TransactionTrait, + error::*, AccessMode, ConnectionTrait, DatabaseTransaction, ExecResult, IsolationLevel, + QueryResult, Statement, StatementBuilder, StreamTrait, TransactionError, TransactionTrait, }; use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder}; use std::{future::Future, pin::Pin}; @@ -198,11 +198,38 @@ impl TransactionTrait for DatabaseConnection { async fn begin(&self) -> Result { match self { #[cfg(feature = "sqlx-mysql")] - DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin().await, + DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin(None, None).await, #[cfg(feature = "sqlx-postgres")] - DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin().await, + DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin(None, None).await, #[cfg(feature = "sqlx-sqlite")] - DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await, + DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin(None, None).await, + #[cfg(feature = "mock")] + DatabaseConnection::MockDatabaseConnection(conn) => { + DatabaseTransaction::new_mock(Arc::clone(conn), None).await + } + DatabaseConnection::Disconnected => panic!("Disconnected"), + } + } + + #[instrument(level = "trace")] + async fn begin_with_config( + &self, + _isolation_level: Option, + _access_mode: Option, + ) -> Result { + match self { + #[cfg(feature = "sqlx-mysql")] + DatabaseConnection::SqlxMySqlPoolConnection(conn) => { + conn.begin(_isolation_level, _access_mode).await + } + #[cfg(feature = "sqlx-postgres")] + DatabaseConnection::SqlxPostgresPoolConnection(conn) => { + conn.begin(_isolation_level, _access_mode).await + } + #[cfg(feature = "sqlx-sqlite")] + DatabaseConnection::SqlxSqlitePoolConnection(conn) => { + conn.begin(_isolation_level, _access_mode).await + } #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => { DatabaseTransaction::new_mock(Arc::clone(conn), None).await @@ -225,13 +252,61 @@ impl TransactionTrait for DatabaseConnection { { match self { #[cfg(feature = "sqlx-mysql")] - DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.transaction(_callback).await, + DatabaseConnection::SqlxMySqlPoolConnection(conn) => { + conn.transaction(_callback, None, None).await + } + #[cfg(feature = "sqlx-postgres")] + DatabaseConnection::SqlxPostgresPoolConnection(conn) => { + conn.transaction(_callback, None, None).await + } + #[cfg(feature = "sqlx-sqlite")] + DatabaseConnection::SqlxSqlitePoolConnection(conn) => { + conn.transaction(_callback, None, None).await + } + #[cfg(feature = "mock")] + DatabaseConnection::MockDatabaseConnection(conn) => { + let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None) + .await + .map_err(TransactionError::Connection)?; + transaction.run(_callback).await + } + DatabaseConnection::Disconnected => panic!("Disconnected"), + } + } + + /// Execute the function inside a transaction. + /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. + #[instrument(level = "trace", skip(_callback))] + async fn transaction_with_config( + &self, + _callback: F, + _isolation_level: Option, + _access_mode: Option, + ) -> Result> + where + F: for<'c> FnOnce( + &'c DatabaseTransaction, + ) -> Pin> + Send + 'c>> + + Send, + T: Send, + E: std::error::Error + Send, + { + match self { + #[cfg(feature = "sqlx-mysql")] + DatabaseConnection::SqlxMySqlPoolConnection(conn) => { + conn.transaction(_callback, _isolation_level, _access_mode) + .await + } #[cfg(feature = "sqlx-postgres")] DatabaseConnection::SqlxPostgresPoolConnection(conn) => { - conn.transaction(_callback).await + conn.transaction(_callback, _isolation_level, _access_mode) + .await } #[cfg(feature = "sqlx-sqlite")] - DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await, + DatabaseConnection::SqlxSqlitePoolConnection(conn) => { + conn.transaction(_callback, _isolation_level, _access_mode) + .await + } #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => { let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None) diff --git a/src/database/transaction.rs b/src/database/transaction.rs index 85fd304a6..bfb6f2bbd 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -1,6 +1,6 @@ use crate::{ - debug_print, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult, - Statement, StreamTrait, TransactionStream, TransactionTrait, + debug_print, AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, + IsolationLevel, QueryResult, Statement, StreamTrait, TransactionStream, TransactionTrait, }; #[cfg(feature = "sqlx-dep")] use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err}; @@ -31,11 +31,15 @@ impl DatabaseTransaction { pub(crate) async fn new_mysql( inner: PoolConnection, metric_callback: Option, + isolation_level: Option, + access_mode: Option, ) -> Result { Self::begin( Arc::new(Mutex::new(InnerConnection::MySql(inner))), DbBackend::MySql, metric_callback, + isolation_level, + access_mode, ) .await } @@ -44,11 +48,15 @@ impl DatabaseTransaction { pub(crate) async fn new_postgres( inner: PoolConnection, metric_callback: Option, + isolation_level: Option, + access_mode: Option, ) -> Result { Self::begin( Arc::new(Mutex::new(InnerConnection::Postgres(inner))), DbBackend::Postgres, metric_callback, + isolation_level, + access_mode, ) .await } @@ -57,11 +65,15 @@ impl DatabaseTransaction { pub(crate) async fn new_sqlite( inner: PoolConnection, metric_callback: Option, + isolation_level: Option, + access_mode: Option, ) -> Result { Self::begin( Arc::new(Mutex::new(InnerConnection::Sqlite(inner))), DbBackend::Sqlite, metric_callback, + isolation_level, + access_mode, ) .await } @@ -76,6 +88,8 @@ impl DatabaseTransaction { Arc::new(Mutex::new(InnerConnection::Mock(inner))), backend, metric_callback, + None, + None, ) .await } @@ -86,6 +100,8 @@ impl DatabaseTransaction { conn: Arc>, backend: DbBackend, metric_callback: Option, + isolation_level: Option, + access_mode: Option, ) -> Result { let res = DatabaseTransaction { conn, @@ -96,21 +112,34 @@ impl DatabaseTransaction { match *res.conn.lock().await { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(ref mut c) => { + // in MySQL SET TRANSACTION operations must be executed before transaction start + crate::driver::sqlx_mysql::set_transaction_config(c, isolation_level, access_mode) + .await?; ::TransactionManager::begin(c) .await - .map_err(sqlx_error_to_query_err)? + .map_err(sqlx_error_to_query_err)?; } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(ref mut c) => { ::TransactionManager::begin(c) .await - .map_err(sqlx_error_to_query_err)? + .map_err(sqlx_error_to_query_err)?; + // in PostgreSQL SET TRANSACTION operations must be executed inside transaction + crate::driver::sqlx_postgres::set_transaction_config( + c, + isolation_level, + access_mode, + ) + .await?; } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(ref mut c) => { + // in SQLite isolation level and access mode are global settings + crate::driver::sqlx_sqlite::set_transaction_config(c, isolation_level, access_mode) + .await?; ::TransactionManager::begin(c) .await - .map_err(sqlx_error_to_query_err)? + .map_err(sqlx_error_to_query_err)?; } #[cfg(feature = "mock")] InnerConnection::Mock(ref mut c) => { @@ -415,6 +444,24 @@ impl TransactionTrait for DatabaseTransaction { Arc::clone(&self.conn), self.backend, self.metric_callback.clone(), + None, + None, + ) + .await + } + + #[instrument(level = "trace")] + async fn begin_with_config( + &self, + isolation_level: Option, + access_mode: Option, + ) -> Result { + DatabaseTransaction::begin( + Arc::clone(&self.conn), + self.backend, + self.metric_callback.clone(), + isolation_level, + access_mode, ) .await } @@ -434,6 +481,30 @@ impl TransactionTrait for DatabaseTransaction { let transaction = self.begin().await.map_err(TransactionError::Connection)?; transaction.run(_callback).await } + + /// Execute the function inside a transaction with isolation level and/or access mode. + /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. + #[instrument(level = "trace", skip(_callback))] + async fn transaction_with_config( + &self, + _callback: F, + isolation_level: Option, + access_mode: Option, + ) -> Result> + where + F: for<'c> FnOnce( + &'c DatabaseTransaction, + ) -> Pin> + Send + 'c>> + + Send, + T: Send, + E: std::error::Error + Send, + { + let transaction = self + .begin_with_config(isolation_level, access_mode) + .await + .map_err(TransactionError::Connection)?; + transaction.run(_callback).await + } } /// Defines errors for handling transaction failures diff --git a/src/driver/sqlx_mysql.rs b/src/driver/sqlx_mysql.rs index 73dfa59fd..e66eeefb4 100644 --- a/src/driver/sqlx_mysql.rs +++ b/src/driver/sqlx_mysql.rs @@ -3,15 +3,16 @@ use std::{future::Future, pin::Pin, sync::Arc}; use sqlx::{ mysql::{MySqlConnectOptions, MySqlQueryResult, MySqlRow}, - MySql, MySqlPool, + pool::PoolConnection, + Executor, MySql, MySqlPool, }; use sea_query_binder::SqlxValues; use tracing::instrument; use crate::{ - debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction, - QueryStream, Statement, TransactionError, + debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection, + DatabaseTransaction, DbBackend, IsolationLevel, QueryStream, Statement, TransactionError, }; use super::sqlx_common::*; @@ -150,9 +151,19 @@ impl SqlxMySqlPoolConnection { /// Bundle a set of SQL statements that execute together. #[instrument(level = "trace")] - pub async fn begin(&self) -> Result { + pub async fn begin( + &self, + isolation_level: Option, + access_mode: Option, + ) -> Result { if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_mysql(conn, self.metric_callback.clone()).await + DatabaseTransaction::new_mysql( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await } else { Err(DbErr::ConnectionAcquire) } @@ -160,7 +171,12 @@ impl SqlxMySqlPoolConnection { /// Create a MySQL transaction #[instrument(level = "trace", skip(callback))] - pub async fn transaction(&self, callback: F) -> Result> + pub async fn transaction( + &self, + callback: F, + isolation_level: Option, + access_mode: Option, + ) -> Result> where F: for<'b> FnOnce( &'b DatabaseTransaction, @@ -170,9 +186,14 @@ impl SqlxMySqlPoolConnection { E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_mysql(conn, self.metric_callback.clone()) - .await - .map_err(|e| TransactionError::Connection(e))?; + let transaction = DatabaseTransaction::new_mysql( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await + .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await } else { Err(TransactionError::Connection(DbErr::ConnectionAcquire)) @@ -210,3 +231,29 @@ pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, MySql, Sqlx .map_or(Values(Vec::new()), |values| values.clone()); sqlx::query_with(&stmt.sql, SqlxValues(values)) } + +pub(crate) async fn set_transaction_config( + conn: &mut PoolConnection, + isolation_level: Option, + access_mode: Option, +) -> Result<(), DbErr> { + if let Some(isolation_level) = isolation_level { + let stmt = Statement { + sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"), + values: None, + db_backend: DbBackend::MySql, + }; + let query = sqlx_query(&stmt); + conn.execute(query).await.map_err(sqlx_error_to_exec_err)?; + } + if let Some(access_mode) = access_mode { + let stmt = Statement { + sql: format!("SET TRANSACTION {access_mode}"), + values: None, + db_backend: DbBackend::MySql, + }; + let query = sqlx_query(&stmt); + conn.execute(query).await.map_err(sqlx_error_to_exec_err)?; + } + Ok(()) +} diff --git a/src/driver/sqlx_postgres.rs b/src/driver/sqlx_postgres.rs index 167b34b85..224dd13a3 100644 --- a/src/driver/sqlx_postgres.rs +++ b/src/driver/sqlx_postgres.rs @@ -2,16 +2,17 @@ use sea_query::Values; use std::{future::Future, pin::Pin, sync::Arc}; use sqlx::{ + pool::PoolConnection, postgres::{PgConnectOptions, PgQueryResult, PgRow}, - PgPool, Postgres, + Executor, PgPool, Postgres, }; use sea_query_binder::SqlxValues; use tracing::instrument; use crate::{ - debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction, - QueryStream, Statement, TransactionError, + debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection, + DatabaseTransaction, DbBackend, IsolationLevel, QueryStream, Statement, TransactionError, }; use super::sqlx_common::*; @@ -165,9 +166,19 @@ impl SqlxPostgresPoolConnection { /// Bundle a set of SQL statements that execute together. #[instrument(level = "trace")] - pub async fn begin(&self) -> Result { + pub async fn begin( + &self, + isolation_level: Option, + access_mode: Option, + ) -> Result { if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_postgres(conn, self.metric_callback.clone()).await + DatabaseTransaction::new_postgres( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await } else { Err(DbErr::ConnectionAcquire) } @@ -175,7 +186,12 @@ impl SqlxPostgresPoolConnection { /// Create a PostgreSQL transaction #[instrument(level = "trace", skip(callback))] - pub async fn transaction(&self, callback: F) -> Result> + pub async fn transaction( + &self, + callback: F, + isolation_level: Option, + access_mode: Option, + ) -> Result> where F: for<'b> FnOnce( &'b DatabaseTransaction, @@ -185,9 +201,14 @@ impl SqlxPostgresPoolConnection { E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_postgres(conn, self.metric_callback.clone()) - .await - .map_err(|e| TransactionError::Connection(e))?; + let transaction = DatabaseTransaction::new_postgres( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await + .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await } else { Err(TransactionError::Connection(DbErr::ConnectionAcquire)) @@ -225,3 +246,29 @@ pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, S .map_or(Values(Vec::new()), |values| values.clone()); sqlx::query_with(&stmt.sql, SqlxValues(values)) } + +pub(crate) async fn set_transaction_config( + conn: &mut PoolConnection, + isolation_level: Option, + access_mode: Option, +) -> Result<(), DbErr> { + if let Some(isolation_level) = isolation_level { + let stmt = Statement { + sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"), + values: None, + db_backend: DbBackend::Postgres, + }; + let query = sqlx_query(&stmt); + conn.execute(query).await.map_err(sqlx_error_to_exec_err)?; + } + if let Some(access_mode) = access_mode { + let stmt = Statement { + sql: format!("SET TRANSACTION {access_mode}"), + values: None, + db_backend: DbBackend::Postgres, + }; + let query = sqlx_query(&stmt); + conn.execute(query).await.map_err(sqlx_error_to_exec_err)?; + } + Ok(()) +} diff --git a/src/driver/sqlx_sqlite.rs b/src/driver/sqlx_sqlite.rs index b09f1bd0d..dea481fa0 100644 --- a/src/driver/sqlx_sqlite.rs +++ b/src/driver/sqlx_sqlite.rs @@ -2,16 +2,17 @@ use sea_query::Values; use std::{future::Future, pin::Pin, sync::Arc}; use sqlx::{ + pool::PoolConnection, sqlite::{SqliteConnectOptions, SqliteQueryResult, SqliteRow}, Sqlite, SqlitePool, }; use sea_query_binder::SqlxValues; -use tracing::instrument; +use tracing::{instrument, warn}; use crate::{ - debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction, - QueryStream, Statement, TransactionError, + debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection, + DatabaseTransaction, IsolationLevel, QueryStream, Statement, TransactionError, }; use super::sqlx_common::*; @@ -157,9 +158,19 @@ impl SqlxSqlitePoolConnection { /// Bundle a set of SQL statements that execute together. #[instrument(level = "trace")] - pub async fn begin(&self) -> Result { + pub async fn begin( + &self, + isolation_level: Option, + access_mode: Option, + ) -> Result { if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_sqlite(conn, self.metric_callback.clone()).await + DatabaseTransaction::new_sqlite( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await } else { Err(DbErr::ConnectionAcquire) } @@ -167,7 +178,12 @@ impl SqlxSqlitePoolConnection { /// Create a MySQL transaction #[instrument(level = "trace", skip(callback))] - pub async fn transaction(&self, callback: F) -> Result> + pub async fn transaction( + &self, + callback: F, + isolation_level: Option, + access_mode: Option, + ) -> Result> where F: for<'b> FnOnce( &'b DatabaseTransaction, @@ -177,9 +193,14 @@ impl SqlxSqlitePoolConnection { E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_sqlite(conn, self.metric_callback.clone()) - .await - .map_err(|e| TransactionError::Connection(e))?; + let transaction = DatabaseTransaction::new_sqlite( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await + .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await } else { Err(TransactionError::Connection(DbErr::ConnectionAcquire)) @@ -217,3 +238,17 @@ pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Sqlite, Sql .map_or(Values(Vec::new()), |values| values.clone()); sqlx::query_with(&stmt.sql, SqlxValues(values)) } + +pub(crate) async fn set_transaction_config( + _conn: &mut PoolConnection, + isolation_level: Option, + access_mode: Option, +) -> Result<(), DbErr> { + if isolation_level.is_some() { + warn!("Setting isolation level in a SQLite transaction isn't supported"); + } + if access_mode.is_some() { + warn!("Setting access mode in a SQLite transaction isn't supported"); + } + Ok(()) +}