Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions Isolation level and Access mode #1230

Merged
merged 3 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/database/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,63 @@ pub trait StreamTrait: Send + Sync {
) -> Pin<Box<dyn Future<Output = Result<Self::Stream<'a>, DbErr>> + 'a + Send>>;
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Isolation level
pub enum IsolationLevel {
/// Consistent reads within the same transaction read the snapshot established by the first read.
RepeatableRead,
/// Each consistent read, even within the same transaction, sets and reads its own fresh snapshot.
ReadCommitted,
/// SELECT statements are performed in a nonlocking fashion, but a possible earlier version of a row might be used.
ReadUncommitted,
/// All statements of the current transaction can only see rows committed before the first query or data-modification statement was executed in this transaction.
Serializable,
}

impl std::fmt::Display for IsolationLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IsolationLevel::RepeatableRead => write!(f, "REPEATABLE READ"),
IsolationLevel::ReadCommitted => write!(f, "READ COMMITTED"),
IsolationLevel::ReadUncommitted => write!(f, "READ UNCOMMITTED"),
IsolationLevel::Serializable => write!(f, "SERIALIZABLE"),
}
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Access mode
pub enum AccessMode {
/// Data can't be modified in this transaction
ReadOnly,
/// Data can be modified in this transaction (default)
ReadWrite,
}

impl std::fmt::Display for AccessMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AccessMode::ReadOnly => write!(f, "READ ONLY"),
AccessMode::ReadWrite => write!(f, "READ WRITE"),
}
}
}

/// Spawn database transaction
#[async_trait::async_trait]
pub trait TransactionTrait {
/// Execute SQL `BEGIN` transaction.
/// Returns a Transaction that can be committed or rolled back
async fn begin(&self) -> Result<DatabaseTransaction, DbErr>;

/// Execute SQL `BEGIN` transaction with isolation level and/or access mode.
/// Returns a Transaction that can be committed or rolled back
async fn begin_with_config(
&self,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr>;

/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
Expand All @@ -64,4 +114,20 @@ pub trait TransactionTrait {
+ Send,
T: Send,
E: std::error::Error + Send;

/// Execute the function inside a transaction with isolation level and/or access mode.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction_with_config<F, T, E>(
&self,
callback: F,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::error::Error + Send;
}
91 changes: 83 additions & 8 deletions src/database/db_connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement,
StatementBuilder, StreamTrait, TransactionError, TransactionTrait,
error::*, AccessMode, ConnectionTrait, DatabaseTransaction, ExecResult, IsolationLevel,
QueryResult, Statement, StatementBuilder, StreamTrait, TransactionError, TransactionTrait,
};
use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder};
use std::{future::Future, pin::Pin};
Expand Down Expand Up @@ -198,11 +198,38 @@ impl TransactionTrait for DatabaseConnection {
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin().await,
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin(None, None).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin().await,
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin(None, None).await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await,
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin(None, None).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}

#[instrument(level = "trace")]
async fn begin_with_config(
&self,
_isolation_level: Option<IsolationLevel>,
_access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode).await
}
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode).await
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode).await
}
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
Expand All @@ -225,13 +252,61 @@ impl TransactionTrait for DatabaseConnection {
{
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.transaction(_callback).await,
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
conn.transaction(_callback, None, None).await
}
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.transaction(_callback, None, None).await
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
conn.transaction(_callback, None, None).await
}
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None)
.await
.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}

/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
#[instrument(level = "trace", skip(_callback))]
async fn transaction_with_config<F, T, E>(
&self,
_callback: F,
_isolation_level: Option<IsolationLevel>,
_access_mode: Option<AccessMode>,
) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::error::Error + Send,
{
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
conn.transaction(_callback, _isolation_level, _access_mode)
.await
}
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.transaction(_callback).await
conn.transaction(_callback, _isolation_level, _access_mode)
.await
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await,
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
conn.transaction(_callback, _isolation_level, _access_mode)
.await
}
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None)
Expand Down
81 changes: 76 additions & 5 deletions src/database/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
debug_print, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult,
Statement, StreamTrait, TransactionStream, TransactionTrait,
debug_print, AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection,
IsolationLevel, QueryResult, Statement, StreamTrait, TransactionStream, TransactionTrait,
};
#[cfg(feature = "sqlx-dep")]
use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
Expand Down Expand Up @@ -31,11 +31,15 @@ impl DatabaseTransaction {
pub(crate) async fn new_mysql(
inner: PoolConnection<sqlx::MySql>,
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::MySql(inner))),
DbBackend::MySql,
metric_callback,
isolation_level,
access_mode,
)
.await
}
Expand All @@ -44,11 +48,15 @@ impl DatabaseTransaction {
pub(crate) async fn new_postgres(
inner: PoolConnection<sqlx::Postgres>,
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::Postgres(inner))),
DbBackend::Postgres,
metric_callback,
isolation_level,
access_mode,
)
.await
}
Expand All @@ -57,11 +65,15 @@ impl DatabaseTransaction {
pub(crate) async fn new_sqlite(
inner: PoolConnection<sqlx::Sqlite>,
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::Sqlite(inner))),
DbBackend::Sqlite,
metric_callback,
isolation_level,
access_mode,
)
.await
}
Expand All @@ -76,6 +88,8 @@ impl DatabaseTransaction {
Arc::new(Mutex::new(InnerConnection::Mock(inner))),
backend,
metric_callback,
None,
None,
)
.await
}
Expand All @@ -86,6 +100,8 @@ impl DatabaseTransaction {
conn: Arc<Mutex<InnerConnection>>,
backend: DbBackend,
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
let res = DatabaseTransaction {
conn,
Expand All @@ -96,21 +112,34 @@ impl DatabaseTransaction {
match *res.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
// in MySQL SET TRANSACTION operations must be executed before transaction start
crate::driver::sqlx_mysql::set_transaction_config(c, isolation_level, access_mode)
.await?;
<sqlx::MySql as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)?;
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)?;
// in PostgreSQL SET TRANSACTION operations must be executed inside transaction
crate::driver::sqlx_postgres::set_transaction_config(
c,
isolation_level,
access_mode,
)
.await?;
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
// in SQLite isolation level and access mode are global settings
crate::driver::sqlx_sqlite::set_transaction_config(c, isolation_level, access_mode)
.await?;
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
.map_err(sqlx_error_to_query_err)?;
}
#[cfg(feature = "mock")]
InnerConnection::Mock(ref mut c) => {
Expand Down Expand Up @@ -415,6 +444,24 @@ impl TransactionTrait for DatabaseTransaction {
Arc::clone(&self.conn),
self.backend,
self.metric_callback.clone(),
None,
None,
)
.await
}

#[instrument(level = "trace")]
async fn begin_with_config(
&self,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
DatabaseTransaction::begin(
Arc::clone(&self.conn),
self.backend,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
}
Expand All @@ -434,6 +481,30 @@ impl TransactionTrait for DatabaseTransaction {
let transaction = self.begin().await.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}

/// Execute the function inside a transaction with isolation level and/or access mode.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
#[instrument(level = "trace", skip(_callback))]
async fn transaction_with_config<F, T, E>(
&self,
_callback: F,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::error::Error + Send,
{
let transaction = self
.begin_with_config(isolation_level, access_mode)
.await
.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}
}

/// Defines errors for handling transaction failures
Expand Down
Loading