Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/transaction options #1924

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions sqlx-core/src/acquire.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::database::Database;
use crate::error::Error;
use crate::pool::{MaybePoolConnection, Pool, PoolConnection};
use crate::transaction::Transaction;
use crate::transaction::{Transaction, TransactionManager};
use futures_core::future::BoxFuture;
use std::ops::{Deref, DerefMut};

Expand Down Expand Up @@ -77,6 +77,11 @@ pub trait Acquire<'c> {
fn acquire(self) -> BoxFuture<'c, Result<Self::Connection, Error>>;

fn begin(self) -> BoxFuture<'c, Result<Transaction<'c, Self::Database>, Error>>;

fn begin_with(
self,
options: <<Self::Database as Database>::TransactionManager as TransactionManager>::Options,
) -> BoxFuture<'c, Result<Transaction<'c, Self::Database>, Error>>;
}

impl<'a, DB: Database> Acquire<'a> for &'_ Pool<DB> {
Expand All @@ -92,7 +97,22 @@ impl<'a, DB: Database> Acquire<'a> for &'_ Pool<DB> {
let conn = self.acquire();

Box::pin(async move {
Transaction::begin(MaybePoolConnection::PoolConnection(conn.await?)).await
Transaction::begin_with(
MaybePoolConnection::PoolConnection(conn.await?),
Default::default(),
)
.await
})
}

fn begin_with(
self,
options: <<Self::Database as Database>::TransactionManager as TransactionManager>::Options,
) -> BoxFuture<'static, Result<Transaction<'a, DB>, Error>> {
let conn = self.acquire();

Box::pin(async move {
Transaction::begin_with(MaybePoolConnection::PoolConnection(conn.await?), options).await
})
}
}
Expand Down Expand Up @@ -120,7 +140,18 @@ macro_rules! impl_acquire {
'c,
Result<crate::transaction::Transaction<'c, $DB>, crate::error::Error>,
> {
crate::transaction::Transaction::begin(self)
crate::transaction::Transaction::begin_with(self, Default::default())
}

#[inline]
fn begin_with(
self,
options: <<Self::Database as crate::database::Database>::TransactionManager as crate::transaction::TransactionManager>::Options,
) -> futures_core::future::BoxFuture<
'c,
Result<crate::transaction::Transaction<'c, $DB>, crate::error::Error>,
> {
crate::transaction::Transaction::begin_with(self, options)
}
}

Expand All @@ -144,7 +175,18 @@ macro_rules! impl_acquire {
'c,
Result<crate::transaction::Transaction<'c, $DB>, crate::error::Error>,
> {
crate::transaction::Transaction::begin(&mut **self)
crate::transaction::Transaction::begin_with(&mut **self, Default::default())
}

#[inline]
fn begin_with(
self,
options: <<Self::Database as crate::database::Database>::TransactionManager as crate::transaction::TransactionManager>::Options,
) -> futures_core::future::BoxFuture<
'c,
Result<crate::transaction::Transaction<'c, $DB>, crate::error::Error>,
> {
crate::transaction::Transaction::begin_with(&mut **self, options)
}
}

Expand All @@ -170,7 +212,18 @@ macro_rules! impl_acquire {
't,
Result<crate::transaction::Transaction<'t, $DB>, crate::error::Error>,
> {
crate::transaction::Transaction::begin(&mut **self)
crate::transaction::Transaction::begin_with(&mut **self, Default::default())
}

#[inline]
fn begin_with(
self,
options: <<Self::Database as crate::database::Database>::TransactionManager as crate::transaction::TransactionManager>::Options,
) -> futures_core::future::BoxFuture<
't,
Result<crate::transaction::Transaction<'t, $DB>, crate::error::Error>,
> {
crate::transaction::Transaction::begin_with(&mut **self, options)
}
}
};
Expand Down
15 changes: 14 additions & 1 deletion sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::database::{Database, HasStatementCache};
use crate::error::Error;
use crate::transaction::Transaction;
use crate::transaction::{Transaction, TransactionManager};
use futures_core::future::BoxFuture;
use log::LevelFilter;
use std::fmt::Debug;
Expand Down Expand Up @@ -33,6 +33,19 @@ pub trait Connection: Send {
///
/// Returns a [`Transaction`] for controlling and tracking the new transaction.
fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
where
Self: Sized,
{
Self::begin_with(self, Default::default())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not take the same approach for the other impls (like impl Acquire for Pool)?

}

/// Begin a new transaction or establish a savepoint within the active transaction.
///
/// Returns a [`Transaction`] for controlling and tracking the new transaction.
Comment on lines +42 to +44

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say something about options.

fn begin_with(
&mut self,
options: <<Self::Database as Database>::TransactionManager as TransactionManager>::Options,
) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
where
Self: Sized;

Expand Down
4 changes: 2 additions & 2 deletions sqlx-core/src/mssql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ impl Connection for MssqlConnection {
self.execute("/* SQLx ping */").map_ok(|_| ()).boxed()
}

fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
fn begin_with(&mut self, options: ()) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
where
Self: Sized,
{
Transaction::begin(self)
Transaction::begin_with(self, options)
}

#[doc(hidden)]
Expand Down
3 changes: 2 additions & 1 deletion sqlx-core/src/mssql/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ pub struct MssqlTransactionManager;

impl TransactionManager for MssqlTransactionManager {
type Database = Mssql;
type Options = ();

fn begin(conn: &mut MssqlConnection) -> BoxFuture<'_, Result<(), Error>> {
fn begin_with(conn: &mut MssqlConnection, options: ()) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.stream.transaction_depth;

Expand Down
8 changes: 6 additions & 2 deletions sqlx-core/src/mysql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::error::Error;
use crate::mysql::protocol::statement::StmtClose;
use crate::mysql::protocol::text::{Ping, Quit};
use crate::mysql::statement::MySqlStatementMetadata;
use crate::mysql::transaction::MySqlTransactionOptions;
use crate::mysql::{MySql, MySqlConnectOptions};
use crate::transaction::Transaction;
use futures_core::future::BoxFuture;
Expand Down Expand Up @@ -101,10 +102,13 @@ impl Connection for MySqlConnection {
!self.stream.wbuf.is_empty()
}

fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
fn begin_with(
&mut self,
options: MySqlTransactionOptions,
) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
where
Self: Sized,
{
Transaction::begin(self)
Transaction::begin_with(self, options)
}
}
2 changes: 1 addition & 1 deletion sqlx-core/src/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use options::{MySqlConnectOptions, MySqlSslMode};
pub use query_result::MySqlQueryResult;
pub use row::MySqlRow;
pub use statement::MySqlStatement;
pub use transaction::MySqlTransactionManager;
pub use transaction::{MySqlIsolationLevel, MySqlTransactionManager, MySqlTransactionOptions};
pub use type_info::MySqlTypeInfo;
pub use value::{MySqlValue, MySqlValueFormat, MySqlValueRef};

Expand Down
125 changes: 110 additions & 15 deletions sqlx-core/src/mysql/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,29 @@ use crate::mysql::connection::Waiting;
use crate::mysql::protocol::text::Query;
use crate::mysql::{MySql, MySqlConnection};
use crate::transaction::{
begin_ansi_transaction_sql, commit_ansi_transaction_sql, rollback_ansi_transaction_sql,
TransactionManager,
begin_savepoint_sql, commit_savepoint_sql, rollback_savepoint_sql, TransactionManager,
COMMIT_ANSI_TRANSACTION, ROLLBACK_ANSI_TRANSACTION,
};

/// Implementation of [`TransactionManager`] for MySQL.
pub struct MySqlTransactionManager;

impl TransactionManager for MySqlTransactionManager {
type Database = MySql;
type Options = MySqlTransactionOptions;

fn begin(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
fn begin_with(
conn: &mut MySqlConnection,
options: MySqlTransactionOptions,
) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;

conn.execute(&*begin_ansi_transaction_sql(depth)).await?;
let stmt = if depth == 0 {
options.sql()
} else {
begin_savepoint_sql(depth)
};
conn.execute(&*stmt).await?;
conn.transaction_depth = depth + 1;

Ok(())
Expand All @@ -30,10 +38,14 @@ impl TransactionManager for MySqlTransactionManager {
fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;

if depth > 0 {
conn.execute(&*commit_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth - 1;
let stmt = if depth == 1 {
COMMIT_ANSI_TRANSACTION.to_string()
} else {
commit_savepoint_sql(depth)
};
conn.execute(&*stmt).await?;
conn.transaction_depth -= 1;
}

Ok(())
Expand All @@ -43,10 +55,14 @@ impl TransactionManager for MySqlTransactionManager {
fn rollback(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this var?


if depth > 0 {
conn.execute(&*rollback_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth - 1;
let stmt = if depth == 1 {
ROLLBACK_ANSI_TRANSACTION.to_string()
} else {
rollback_savepoint_sql(depth)
};
conn.execute(&*stmt).await?;
conn.transaction_depth -= 1;
}

Ok(())
Expand All @@ -55,14 +71,93 @@ impl TransactionManager for MySqlTransactionManager {

fn start_rollback(conn: &mut MySqlConnection) {
let depth = conn.transaction_depth;

if depth > 0 {
conn.stream.waiting.push_back(Waiting::Result);
conn.stream.sequence_id = 0;
conn.stream
.write_packet(Query(&*rollback_ansi_transaction_sql(depth)));
if depth == 1 {
conn.stream.write_packet(Query(ROLLBACK_ANSI_TRANSACTION));
} else {
conn.stream
.write_packet(Query(&rollback_savepoint_sql(depth)));
}
conn.transaction_depth -= 1;
}
}
}

/// Transaction initiation options for MySQL.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct MySqlTransactionOptions {
pub(crate) consistent_read: bool,
pub(crate) read_only: bool,
pub(crate) iso_level: Option<MySqlIsolationLevel>,
}

impl MySqlTransactionOptions {
pub fn consistent_read(mut self) -> Self {
self.consistent_read = true;
self
}

pub fn read_only(mut self) -> Self {
self.read_only = true;
self
}

conn.transaction_depth = depth - 1;
pub fn isolation_level(mut self, level: MySqlIsolationLevel) -> Self {
self.iso_level.replace(level);
self
}

pub(crate) fn sql(&self) -> String {
let mut sql = String::with_capacity(64);
if let Some(level) = self.iso_level {
sql.push_str("SET TRANSACTION");
match level {
MySqlIsolationLevel::Serializable => sql.push_str(" ISOLATION LEVEL SERIALIZABLE"),
MySqlIsolationLevel::RepeatableRead => {
sql.push_str(" ISOLATION LEVEL REPEATABLE READ")
}
MySqlIsolationLevel::ReadCommitted => {
sql.push_str(" ISOLATION LEVEL READ COMMITTED")
}
MySqlIsolationLevel::ReadUncommitted => {
sql.push_str(" ISOLATION LEVEL READ UNCOMMITTED")
}
}
sql.push_str("; ");
}
sql.push_str("START TRANSACTION");
if self.read_only {
sql.push_str(" READ ONLY");
}
if self.consistent_read {
sql.push_str(" WITH CONSISTENT SNAPSHOT");
}
sql
}
}

impl From<MySqlIsolationLevel> for MySqlTransactionOptions {
fn from(iso_level: MySqlIsolationLevel) -> Self {
Self {
iso_level: Some(iso_level),
..Default::default()
}
}
}

/// Transaction isolation levels for MySQL.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MySqlIsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}

impl Default for MySqlIsolationLevel {
fn default() -> Self {
Self::RepeatableRead
}
}
Loading