Skip to content

Commit

Permalink
Change sqlx-sqlite to check for uniqueness error instead of checking …
Browse files Browse the repository at this point in the history
…for existence

Currently, the sqlx-sqlite store has an issue with databases running
using the WAL journal if multiple requests come in at the same time,
causing a "database is locked" error. The sequence is roughly this:

1. Two connections call `SqliteStore::create`, at the same time.
2. Concurrently, they both call `SqliteStore::id_exists`. Both lock the
   database for reading and sets their "end mark" to the current
   database state.
3. Connection 1 calls `save_with_conn` first and commits its
   transaction. This upgrades its lock to a write lock and advances the
   database state - connection 2's end mark is still before this update.
4. Connection 2 calls `save_with_conn`. It attempts to upgrade its read
   lock to a write lock, but fails since the database has been modified
   since the read lock was acquired, and sqlite returns a "database is
   locked" error.

This patch fixes the issue by removing the `if_exists` check and instead
attempts to insert the ID, check if the insert fails with a uniqueness
error, and regenerate the ID if so. With this, all connections
immediately lock the database for writing and will wait for each other
to finish before inserting. It should also be more performant in the
common case of the generated ID being unique.
  • Loading branch information
ColonelThirtyTwo committed Oct 3, 2024
1 parent d7db77d commit 60992c8
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions sqlx-store/src/sqlite_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,30 @@ impl SqliteStore {
Ok(())
}

async fn id_exists(&self, conn: &mut SqliteConnection, id: &Id) -> session_store::Result<bool> {
async fn try_create_with_conn(
&self,
conn: &mut SqliteConnection,
record: &Record,
) -> session_store::Result<bool> {
let query = format!(
r#"
select exists(select 1 from {table_name} where id = ?)
insert or abort into {table_name}
(id, data, expiry_date) values (?, ?, ?)
"#,
table_name = self.table_name
);
let res = sqlx::query(&query)
.bind(record.id.to_string())
.bind(rmp_serde::to_vec(record).map_err(SqlxStoreError::Encode)?)
.bind(record.expiry_date)
.execute(conn)
.await;

Ok(sqlx::query_scalar(&query)
.bind(id.to_string())
.fetch_one(conn)
.await
.map_err(SqlxStoreError::Sqlx)?)
match res {
Ok(_) => Ok(true),
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Ok(false),
Err(e) => Err(SqlxStoreError::Sqlx(e).into()),
}
}

async fn save_with_conn(
Expand Down Expand Up @@ -133,10 +144,9 @@ impl SessionStore for SqliteStore {
async fn create(&self, record: &mut Record) -> session_store::Result<()> {
let mut tx = self.pool.begin().await.map_err(SqlxStoreError::Sqlx)?;

while self.id_exists(&mut tx, &record.id).await? {
while !self.try_create_with_conn(&mut tx, record).await? {
record.id = Id::default(); // Generate a new ID
}
self.save_with_conn(&mut tx, record).await?;

tx.commit().await.map_err(SqlxStoreError::Sqlx)?;

Expand Down

0 comments on commit 60992c8

Please sign in to comment.