Skip to content

Commit

Permalink
fix(db): Fix / extend transaction isolation levels (#2350)
Browse files Browse the repository at this point in the history
## What ❔

- Makes readonly Postgres transaction have "repeatable read" isolation
level by default.
- Allows specifying an isolation level in the transaction builder.

## Why ❔

Readonly transactions usually expect a consistent DB view, hence the
"repeatable read" isolation level.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Jul 2, 2024
1 parent 107e1a7 commit 404ceb9
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/lib/dal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
pub use sqlx::{types::BigDecimal, Error as SqlxError};
use zksync_db_connection::connection::DbMarker;
pub use zksync_db_connection::{
connection::Connection,
connection::{Connection, IsolationLevel},
connection_pool::{ConnectionPool, ConnectionPoolBuilder},
error::{DalError, DalResult},
};
Expand Down
1 change: 1 addition & 0 deletions core/lib/db_connection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ tracing.workspace = true

[dev-dependencies]
assert_matches.workspace = true
test-casing.workspace = true
92 changes: 86 additions & 6 deletions core/lib/db_connection/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
Ok(TransactionBuilder {
connection: self,
is_readonly: false,
isolation_level: None,
})
}

Expand Down Expand Up @@ -280,11 +281,26 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
}
}

/// Transaction isolation level.
///
/// See [Postgres docs](https://www.postgresql.org/docs/14/transaction-iso.html) for details on isolation level semantics.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum IsolationLevel {
/// "Read committed" isolation level.
ReadCommitted,
/// "Repeatable read" isolation level (aka "snapshot isolation").
RepeatableRead,
/// Serializable isolation level.
Serializable,
}

/// Builder of transactions allowing to configure transaction characteristics (for now, just its readonly status).
#[derive(Debug)]
pub struct TransactionBuilder<'a, 'c, DB: DbMarker> {
connection: &'a mut Connection<'c, DB>,
is_readonly: bool,
isolation_level: Option<IsolationLevel>,
}

impl<'a, DB: DbMarker> TransactionBuilder<'a, '_, DB> {
Expand All @@ -294,12 +310,40 @@ impl<'a, DB: DbMarker> TransactionBuilder<'a, '_, DB> {
self
}

/// Sets the isolation level of this transaction. If this method is not called, the isolation level will be
/// "read committed" (the default Postgres isolation level) for read-write transactions, and "repeatable read"
/// for readonly transactions. Beware that setting high isolation level for read-write transactions may lead
/// to performance degradation and/or isolation-related errors.
pub fn set_isolation(mut self, level: IsolationLevel) -> Self {
self.isolation_level = Some(level);
self
}

/// Builds the transaction with the provided characteristics.
pub async fn build(self) -> DalResult<Connection<'a, DB>> {
let mut transaction = self.connection.start_transaction().await?;

let level = self.isolation_level.unwrap_or(if self.is_readonly {
IsolationLevel::RepeatableRead
} else {
IsolationLevel::ReadCommitted
});
let level = match level {
IsolationLevel::ReadCommitted => "READ COMMITTED",
IsolationLevel::RepeatableRead => "REPEATABLE READ",
IsolationLevel::Serializable => "SERIALIZABLE",
};
let mut set_transaction_args = format!(" ISOLATION LEVEL {level}");

if self.is_readonly {
sqlx::query("SET TRANSACTION READ ONLY")
set_transaction_args += " READ ONLY";
}

if !set_transaction_args.is_empty() {
sqlx::query(&format!("SET TRANSACTION{set_transaction_args}"))
.instrument("set_transaction_characteristics")
.with_arg("isolation_level", &self.isolation_level)
.with_arg("readonly", &self.is_readonly)
.execute(&mut transaction)
.await?;
}
Expand All @@ -309,6 +353,8 @@ impl<'a, DB: DbMarker> TransactionBuilder<'a, '_, DB> {

#[cfg(test)]
mod tests {
use test_casing::test_casing;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -344,17 +390,51 @@ mod tests {
}
}

const ISOLATION_LEVELS: [Option<IsolationLevel>; 4] = [
None,
Some(IsolationLevel::ReadCommitted),
Some(IsolationLevel::RepeatableRead),
Some(IsolationLevel::Serializable),
];

#[test_casing(4, ISOLATION_LEVELS)]
#[tokio::test]
async fn creating_readonly_transaction() {
async fn setting_isolation_level_for_transaction(level: Option<IsolationLevel>) {
let pool = ConnectionPool::<InternalMarker>::constrained_test_pool(1).await;
let mut connection = pool.connection().await.unwrap();
let mut readonly_transaction = connection
.transaction_builder()
let mut transaction_builder = connection.transaction_builder().unwrap();
if let Some(level) = level {
transaction_builder = transaction_builder.set_isolation(level);
}

let mut transaction = transaction_builder.build().await.unwrap();
assert!(transaction.in_transaction());

sqlx::query("SELECT COUNT(*) AS \"count?\" FROM miniblocks")
.instrument("test")
.fetch_optional(&mut transaction)
.await
.unwrap()
.set_readonly()
.build()
.expect("no row returned");
// Check that it's possible to execute write statements in the transaction.
sqlx::query("DELETE FROM miniblocks")
.instrument("test")
.execute(&mut transaction)
.await
.unwrap();
}

#[test_casing(4, ISOLATION_LEVELS)]
#[tokio::test]
async fn creating_readonly_transaction(level: Option<IsolationLevel>) {
let pool = ConnectionPool::<InternalMarker>::constrained_test_pool(1).await;
let mut connection = pool.connection().await.unwrap();
let mut transaction_builder = connection.transaction_builder().unwrap().set_readonly();
if let Some(level) = level {
transaction_builder = transaction_builder.set_isolation(level);
}

let mut readonly_transaction = transaction_builder.build().await.unwrap();
assert!(readonly_transaction.in_transaction());

sqlx::query("SELECT COUNT(*) AS \"count?\" FROM miniblocks")
Expand Down

0 comments on commit 404ceb9

Please sign in to comment.