Skip to content

Commit

Permalink
Add method to run async transaction without retries.
Browse files Browse the repository at this point in the history
  • Loading branch information
ISibboI committed Aug 20, 2023
1 parent 743f489 commit 1bfdf91
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 64 deletions.
80 changes: 66 additions & 14 deletions backend/rvoc-backend/src/database/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl RVocAsyncDatabaseConnectionPool {
pub async fn execute_transaction_with_retries<
'b,
ReturnType: 'b + Send,
PermanentErrorType: PermanentTransactionError,
PermanentErrorType: PermanentTransactionError + TooManyTemporaryTransactionErrors,
>(
&self,
transaction: impl for<'r> Fn(
Expand Down Expand Up @@ -56,9 +56,47 @@ impl RVocAsyncDatabaseConnectionPool {
}
}

Err(PermanentTransactionError::too_many_temporary_errors(
max_retries,
))
Err(PermanentErrorType::too_many_temporary_errors(max_retries))
}

/// Execute an asynchronous database transaction.
#[instrument(err, skip(self, transaction))]
pub async fn execute_transaction<
'b,
ReturnType: 'b + Send,
ErrorType: 'b + Send + PermanentTransactionError,
>(
&self,
transaction: impl for<'r> FnOnce(
&'r mut AsyncPgConnection,
) -> diesel_async::scoped_futures::ScopedBoxFuture<
'b,
'r,
Result<ReturnType, ErrorType>,
> + Send
+ Sync,
) -> Result<ReturnType, ErrorType> {
let mut database_connection = self.implementation.get().await.map_err(|error| {
ErrorType::permanent_error(Box::new(RVocError::DatabaseConnection {
source: Box::new(error),
}))
})?;

database_connection
.build_transaction()
.serializable()
.run(|database_connection| {
Box::pin(async move {
transaction(database_connection)
.await
.map_err(|error| FromDieselError::ErrorType(error))
})
})
.await
.map_err(|error| match error {
FromDieselError::ErrorType(error) => error,
FromDieselError::Diesel(error) => ErrorType::permanent_error(Box::new(error)),
})
}
}

Expand All @@ -71,7 +109,7 @@ impl RVocSyncDatabaseConnection {
#[allow(dead_code)]
pub fn execute_sync_transaction_with_retries<
ReturnType,
PermanentErrorType: PermanentTransactionError,
PermanentErrorType: PermanentTransactionError + TooManyTemporaryTransactionErrors,
>(
&mut self,
transaction: impl Fn(&mut PgConnection) -> Result<ReturnType, TransactionError>,
Expand All @@ -97,9 +135,7 @@ impl RVocSyncDatabaseConnection {
}
}

Err(PermanentTransactionError::too_many_temporary_errors(
max_retries,
))
Err(PermanentErrorType::too_many_temporary_errors(max_retries))
}
}

Expand Down Expand Up @@ -127,20 +163,36 @@ impl From<diesel::result::Error> for TransactionError {

/// An error type that indicates a permanent transaction failure.
pub trait PermanentTransactionError: Error {
/// Construct the error instance representing "too many temporary errors".
/// The `limit` is the error limit that was reached.
fn too_many_temporary_errors(limit: u64) -> Self;

/// Construct the error instance representing a general permanent error.
fn permanent_error(source: BoxDynError) -> Self;
}

impl PermanentTransactionError for RVocError {
fn permanent_error(source: BoxDynError) -> Self {
Self::PermanentDatabaseTransactionError { source }
}
}

/// An error type that indicates a permanent transaction failure caused by too many temporary failures.
pub trait TooManyTemporaryTransactionErrors: Error {
/// Construct the error instance representing "too many temporary errors".
/// The `limit` is the error limit that was reached.
fn too_many_temporary_errors(limit: u64) -> Self;
}

impl TooManyTemporaryTransactionErrors for RVocError {
fn too_many_temporary_errors(limit: u64) -> Self {
Self::DatabaseTransactionRetryLimitReached { limit }
}
}

fn permanent_error(source: BoxDynError) -> Self {
Self::PermanentDatabaseTransactionError { source }
enum FromDieselError<ErrorType> {
ErrorType(ErrorType),
Diesel(diesel::result::Error),
}

impl<ErrorType> From<diesel::result::Error> for FromDieselError<ErrorType> {
fn from(value: diesel::result::Error) -> Self {
Self::Diesel(value)
}
}
2 changes: 1 addition & 1 deletion backend/rvoc-backend/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub enum RVocError {
ApiServerError { source: BoxDynError },

#[error("error while inserting session to database: {source}")]
InsertSession {source: BoxDynError},
InsertSession { source: BoxDynError },

#[error("data directory should be a directory, but is a file: {path:?}")]
DataDirectoryIsFile { path: PathBuf },
Expand Down
2 changes: 1 addition & 1 deletion backend/rvoc-backend/src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
configuration::Configuration,
database::RVocAsyncDatabaseConnectionPool,
error::{RVocError, RVocResult},
web::session::{RVocSessionStoreConnector, RVocSessionData},
web::session::{RVocSessionData, RVocSessionStoreConnector},
};

mod session;
Expand Down
81 changes: 34 additions & 47 deletions backend/rvoc-backend/src/web/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,53 +59,46 @@ impl SessionStoreConnector<RVocSessionData> for RVocSessionStoreConnector {
) -> Result<WriteSessionResult, typed_session::Error<Self::Error>> {
match self
.database_connection_pool
.execute_transaction_with_retries::<_, TryInsertSessionError>(
|database_connection| {
Box::pin(async {
use crate::database::schema::sessions::dsl::*;

RVocSessionInsertable::new(current_id, session_expiry, data)
.insert_into(sessions)
.execute(database_connection)
.await
.map_err(|error| match error {
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
database_error_information,
) => {
if database_error_information.table_name() == Some("sessions")
&& database_error_information.column_name() == Some("id")
{
TryInsertSessionError::SessionIdExists
} else {
TryInsertSessionError::Permanent(
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
database_error_information,
)
.into(),
.execute_transaction::<_, TryInsertSessionError>(|database_connection| {
Box::pin(async {
use crate::database::schema::sessions::dsl::*;

RVocSessionInsertable::new(current_id, session_expiry, data)
.insert_into(sessions)
.execute(database_connection)
.await
.map_err(|error| match error {
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
database_error_information,
) => {
if database_error_information.table_name() == Some("sessions")
&& database_error_information.column_name() == Some("id")
{
TryInsertSessionError::SessionIdExists
} else {
TryInsertSessionError::Error(
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
database_error_information,
)
}
.into(),
)
}
error => TryInsertSessionError::Permanent(error.into()),
})
.map_err(|error| TransactionError::Permanent(error.into()))?;

Ok(())
})
},
0,
)
}
error => TryInsertSessionError::Error(error.into()),
})?;

Ok(())
})
})
.await
{
Ok(()) => Ok(WriteSessionResult::Ok(())),
Err(TryInsertSessionError::SessionIdExists) => Ok(WriteSessionResult::SessionIdExists),
Err(TryInsertSessionError::Permanent(error)) => {
Err(TryInsertSessionError::Error(error)) => {
Err(RVocError::InsertSession { source: error })
}
Err(TryInsertSessionError::TooManyTemporaryErrors(amount)) => {
Err(RVocError::DatabaseTransactionRetryLimitReached { limit: amount })
}
}
.map_err(typed_session::Error::SessionStoreConnector)
}
Expand Down Expand Up @@ -168,19 +161,13 @@ impl<'a> RVocSessionInsertable<'a> {
#[derive(Debug, Error)]
enum TryInsertSessionError {
#[error("permanent transaction error: {0}")]
Permanent(BoxDynError),
#[error("too many temporary transaction errors: {0}")]
TooManyTemporaryErrors(u64),
Error(BoxDynError),
#[error("session id exists")]
SessionIdExists,
}

impl PermanentTransactionError for TryInsertSessionError {
fn too_many_temporary_errors(limit: u64) -> Self {
Self::TooManyTemporaryErrors(limit)
}

fn permanent_error(source: crate::error::BoxDynError) -> Self {
Self::Permanent(source)
Self::Error(source)
}
}
2 changes: 1 addition & 1 deletion backend/rvoc-backend/src/web/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ impl AsRef<str> for Username {
fn as_ref(&self) -> &str {
&self.name
}
}
}

0 comments on commit 1bfdf91

Please sign in to comment.