Skip to content

Commit

Permalink
Transactions Isolation level and Access mode (#1230)
Browse files Browse the repository at this point in the history
* Transactions Isolation level and Access mode

* Fix typo

* Fix clippy lints
  • Loading branch information
nappa85 authored and tyt2y3 committed Dec 2, 2022
1 parent 0c09c1f commit 41cc840
Show file tree
Hide file tree
Showing 6 changed files with 381 additions and 40 deletions.
66 changes: 66 additions & 0 deletions src/database/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,63 @@ pub trait StreamTrait: Send + Sync {
) -> Pin<Box<dyn Future<Output = Result<Self::Stream<'a>, DbErr>> + 'a + Send>>;
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Isolation level
pub enum IsolationLevel {
/// Consistent reads within the same transaction read the snapshot established by the first read.
RepeatableRead,
/// Each consistent read, even within the same transaction, sets and reads its own fresh snapshot.
ReadCommitted,
/// SELECT statements are performed in a nonlocking fashion, but a possible earlier version of a row might be used.
ReadUncommitted,
/// All statements of the current transaction can only see rows committed before the first query or data-modification statement was executed in this transaction.
Serializable,
}

impl std::fmt::Display for IsolationLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IsolationLevel::RepeatableRead => write!(f, "REPEATABLE READ"),
IsolationLevel::ReadCommitted => write!(f, "READ COMMITTED"),
IsolationLevel::ReadUncommitted => write!(f, "READ UNCOMMITTED"),
IsolationLevel::Serializable => write!(f, "SERIALIZABLE"),
}
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Access mode
pub enum AccessMode {
/// Data can't be modified in this transaction
ReadOnly,
/// Data can be modified in this transaction (default)
ReadWrite,
}

impl std::fmt::Display for AccessMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AccessMode::ReadOnly => write!(f, "READ ONLY"),
AccessMode::ReadWrite => write!(f, "READ WRITE"),
}
}
}

/// Spawn database transaction
#[async_trait::async_trait]
pub trait TransactionTrait {
/// Execute SQL `BEGIN` transaction.
/// Returns a Transaction that can be committed or rolled back
async fn begin(&self) -> Result<DatabaseTransaction, DbErr>;

/// Execute SQL `BEGIN` transaction with isolation level and/or access mode.
/// Returns a Transaction that can be committed or rolled back
async fn begin_with_config(
&self,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr>;

/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
Expand All @@ -64,4 +114,20 @@ pub trait TransactionTrait {
+ Send,
T: Send,
E: std::error::Error + Send;

/// Execute the function inside a transaction with isolation level and/or access mode.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction_with_config<F, T, E>(
&self,
callback: F,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::error::Error + Send;
}
91 changes: 83 additions & 8 deletions src/database/db_connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement,
StatementBuilder, StreamTrait, TransactionError, TransactionTrait,
error::*, AccessMode, ConnectionTrait, DatabaseTransaction, ExecResult, IsolationLevel,
QueryResult, Statement, StatementBuilder, StreamTrait, TransactionError, TransactionTrait,
};
use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder};
use std::{future::Future, pin::Pin};
Expand Down Expand Up @@ -198,11 +198,38 @@ impl TransactionTrait for DatabaseConnection {
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin().await,
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin(None, None).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin().await,
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin(None, None).await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await,
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin(None, None).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}

#[instrument(level = "trace")]
async fn begin_with_config(
&self,
_isolation_level: Option<IsolationLevel>,
_access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode).await
}
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode).await
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode).await
}
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
Expand All @@ -225,13 +252,61 @@ impl TransactionTrait for DatabaseConnection {
{
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.transaction(_callback).await,
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
conn.transaction(_callback, None, None).await
}
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.transaction(_callback, None, None).await
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
conn.transaction(_callback, None, None).await
}
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None)
.await
.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}

/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
#[instrument(level = "trace", skip(_callback))]
async fn transaction_with_config<F, T, E>(
&self,
_callback: F,
_isolation_level: Option<IsolationLevel>,
_access_mode: Option<AccessMode>,
) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::error::Error + Send,
{
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
conn.transaction(_callback, _isolation_level, _access_mode)
.await
}
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.transaction(_callback).await
conn.transaction(_callback, _isolation_level, _access_mode)
.await
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await,
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
conn.transaction(_callback, _isolation_level, _access_mode)
.await
}
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None)
Expand Down
81 changes: 76 additions & 5 deletions src/database/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
debug_print, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult,
Statement, StreamTrait, TransactionStream, TransactionTrait,
debug_print, AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection,
IsolationLevel, QueryResult, Statement, StreamTrait, TransactionStream, TransactionTrait,
};
#[cfg(feature = "sqlx-dep")]
use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
Expand Down Expand Up @@ -31,11 +31,15 @@ impl DatabaseTransaction {
pub(crate) async fn new_mysql(
inner: PoolConnection<sqlx::MySql>,
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::MySql(inner))),
DbBackend::MySql,
metric_callback,
isolation_level,
access_mode,
)
.await
}
Expand All @@ -44,11 +48,15 @@ impl DatabaseTransaction {
pub(crate) async fn new_postgres(
inner: PoolConnection<sqlx::Postgres>,
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::Postgres(inner))),
DbBackend::Postgres,
metric_callback,
isolation_level,
access_mode,
)
.await
}
Expand All @@ -57,11 +65,15 @@ impl DatabaseTransaction {
pub(crate) async fn new_sqlite(
inner: PoolConnection<sqlx::Sqlite>,
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::Sqlite(inner))),
DbBackend::Sqlite,
metric_callback,
isolation_level,
access_mode,
)
.await
}
Expand All @@ -76,6 +88,8 @@ impl DatabaseTransaction {
Arc::new(Mutex::new(InnerConnection::Mock(inner))),
backend,
metric_callback,
None,
None,
)
.await
}
Expand All @@ -86,6 +100,8 @@ impl DatabaseTransaction {
conn: Arc<Mutex<InnerConnection>>,
backend: DbBackend,
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
let res = DatabaseTransaction {
conn,
Expand All @@ -96,21 +112,34 @@ impl DatabaseTransaction {
match *res.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
// in MySQL SET TRANSACTION operations must be executed before transaction start
crate::driver::sqlx_mysql::set_transaction_config(c, isolation_level, access_mode)
.await?;
<sqlx::MySql as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)?;
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)?;
// in PostgreSQL SET TRANSACTION operations must be executed inside transaction
crate::driver::sqlx_postgres::set_transaction_config(
c,
isolation_level,
access_mode,
)
.await?;
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
// in SQLite isolation level and access mode are global settings
crate::driver::sqlx_sqlite::set_transaction_config(c, isolation_level, access_mode)
.await?;
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)?;
}
#[cfg(feature = "mock")]
InnerConnection::Mock(ref mut c) => {
Expand Down Expand Up @@ -415,6 +444,24 @@ impl TransactionTrait for DatabaseTransaction {
Arc::clone(&self.conn),
self.backend,
self.metric_callback.clone(),
None,
None,
)
.await
}

#[instrument(level = "trace")]
async fn begin_with_config(
&self,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
DatabaseTransaction::begin(
Arc::clone(&self.conn),
self.backend,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
}
Expand All @@ -434,6 +481,30 @@ impl TransactionTrait for DatabaseTransaction {
let transaction = self.begin().await.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}

/// Execute the function inside a transaction with isolation level and/or access mode.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
#[instrument(level = "trace", skip(_callback))]
async fn transaction_with_config<F, T, E>(
&self,
_callback: F,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::error::Error + Send,
{
let transaction = self
.begin_with_config(isolation_level, access_mode)
.await
.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}
}

/// Defines errors for handling transaction failures
Expand Down
Loading

0 comments on commit 41cc840

Please sign in to comment.