diff --git a/sqlx-core/src/sqlite/connection/worker.rs b/sqlx-core/src/sqlite/connection/worker.rs index b737fbf386..9bf2d825d1 100644 --- a/sqlx-core/src/sqlite/connection/worker.rs +++ b/sqlx-core/src/sqlite/connection/worker.rs @@ -55,13 +55,13 @@ enum Command { tx: flume::Sender, Error>>, }, Begin { - tx: oneshot::Sender>, + tx: rendezvous_oneshot::Sender>, }, Commit { - tx: oneshot::Sender>, + tx: rendezvous_oneshot::Sender>, }, Rollback { - tx: Option>>, + tx: Option>>, }, CreateCollation { create_collation: @@ -116,6 +116,11 @@ impl ConnectionWorker { return; } + // If COMMIT or ROLLBACK is processed but not acknowledged, there would be another + // ROLLBACK sent when the `Transaction` drops. We need to ignore it otherwise we + // would rollback an already completed transaction. + let mut ignore_next_start_rollback = false; + for cmd in command_rx { match cmd { Command::Prepare { query, tx } => { @@ -162,8 +167,27 @@ impl ConnectionWorker { .map(|_| { conn.transaction_depth += 1; }); - - tx.send(res).ok(); + let res_ok = res.is_ok(); + + if tx.blocking_send(res).is_err() && res_ok { + // The BEGIN was processed but not acknowledged. This means no + // `Transaction` was created and so there is no way to commit / + // rollback this transaction. We need to roll it back + // immediately otherwise it would remain started forever. + if let Err(e) = conn + .handle + .exec(rollback_ansi_transaction_sql(depth + 1)) + .map(|_| { + conn.transaction_depth -= 1; + }) + { + // The rollback failed. To prevent leaving the connection + // in an inconsistent state we shutdown this worker which + // causes any subsequent operation on the connection to fail. + log::error!("failed to rollback cancelled transaction: {}", e); + break; + } + } } Command::Commit { tx } => { let depth = conn.transaction_depth; @@ -177,10 +201,21 @@ impl ConnectionWorker { } else { Ok(()) }; + let res_ok = res.is_ok(); - tx.send(res).ok(); + if tx.blocking_send(res).is_err() && res_ok { + // The COMMIT was processed but not acknowledged. This means that + // the `Transaction` doesn't know it was committed and will try to + // rollback on drop. We need to ignore that rollback. + ignore_next_start_rollback = true; + } } Command::Rollback { tx } => { + if ignore_next_start_rollback && tx.is_none() { + ignore_next_start_rollback = false; + continue; + } + let depth = conn.transaction_depth; let res = if depth > 0 { @@ -193,8 +228,16 @@ impl ConnectionWorker { Ok(()) }; + let res_ok = res.is_ok(); + if let Some(tx) = tx { - tx.send(res).ok(); + if tx.blocking_send(res).is_err() && res_ok { + // The ROLLBACK was processed but not acknowledged. This means + // that the `Transaction` doesn't know it was rolled back and + // will try to rollback again on drop. We need to ignore that + // rollback. + ignore_next_start_rollback = true; + } } } Command::CreateCollation { create_collation } => { @@ -268,15 +311,17 @@ impl ConnectionWorker { } pub(crate) async fn begin(&mut self) -> Result<(), Error> { - self.oneshot_cmd(|tx| Command::Begin { tx }).await? + self.oneshot_cmd_with_ack(|tx| Command::Begin { tx }) + .await? } pub(crate) async fn commit(&mut self) -> Result<(), Error> { - self.oneshot_cmd(|tx| Command::Commit { tx }).await? + self.oneshot_cmd_with_ack(|tx| Command::Commit { tx }) + .await? } pub(crate) async fn rollback(&mut self) -> Result<(), Error> { - self.oneshot_cmd(|tx| Command::Rollback { tx: Some(tx) }) + self.oneshot_cmd_with_ack(|tx| Command::Rollback { tx: Some(tx) }) .await? } @@ -304,6 +349,20 @@ impl ConnectionWorker { rx.await.map_err(|_| Error::WorkerCrashed) } + async fn oneshot_cmd_with_ack(&mut self, command: F) -> Result + where + F: FnOnce(rendezvous_oneshot::Sender) -> Command, + { + let (tx, rx) = rendezvous_oneshot::channel(); + + self.command_tx + .send_async(command(tx)) + .await + .map_err(|_| Error::WorkerCrashed)?; + + rx.recv().await.map_err(|_| Error::WorkerCrashed) + } + pub fn create_collation( &mut self, name: &str, @@ -387,3 +446,41 @@ fn prepare(conn: &mut ConnectionState, query: &str) -> Result() -> (Sender, Receiver) { + let (inner_tx, inner_rx) = oneshot::channel(); + (Sender { inner: inner_tx }, Receiver { inner: inner_rx }) + } + + pub struct Sender { + inner: oneshot::Sender<(T, oneshot::Sender<()>)>, + } + + impl Sender { + pub async fn send(self, value: T) -> Result<(), Canceled> { + let (ack_tx, ack_rx) = oneshot::channel(); + self.inner.send((value, ack_tx)).map_err(|_| Canceled)?; + ack_rx.await + } + + pub fn blocking_send(self, value: T) -> Result<(), Canceled> { + futures_executor::block_on(self.send(value)) + } + } + + pub struct Receiver { + inner: oneshot::Receiver<(T, oneshot::Sender<()>)>, + } + + impl Receiver { + pub async fn recv(self) -> Result { + let (value, ack_tx) = self.inner.await?; + ack_tx.send(()).map_err(|_| Canceled)?; + Ok(value) + } + } +}