Skip to content

Commit

Permalink
fix(mysql): handle multiple waiting results correctly (#1439)
Browse files Browse the repository at this point in the history
* test(mysql): add test case for pending rows and dropped transaction

* fix(mysql): handle multiple waiting results correctly
  • Loading branch information
eagletmt authored Sep 22, 2021
1 parent 24c0d52 commit 593364f
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 23 deletions.
12 changes: 6 additions & 6 deletions sqlx-core/src/mysql/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::ext::ustr::UStr;
use crate::logger::QueryLogger;
use crate::mysql::connection::stream::Busy;
use crate::mysql::connection::stream::Waiting;
use crate::mysql::io::MySqlBufExt;
use crate::mysql::protocol::response::Status;
use crate::mysql::protocol::statement::{
Expand Down Expand Up @@ -93,7 +93,7 @@ impl MySqlConnection {
let mut logger = QueryLogger::new(sql, self.log_settings.clone());

self.stream.wait_until_ready().await?;
self.stream.busy = Busy::Result;
self.stream.waiting.push_back(Waiting::Result);

Ok(Box::pin(try_stream! {
// make a slot for the shared column data
Expand Down Expand Up @@ -146,12 +146,12 @@ impl MySqlConnection {
continue;
}

self.stream.busy = Busy::NotBusy;
self.stream.waiting.pop_front();
return Ok(());
}

// otherwise, this first packet is the start of the result-set metadata,
self.stream.busy = Busy::Row;
*self.stream.waiting.front_mut().unwrap() = Waiting::Row;

let num_columns = packet.get_uint_lenenc() as usize; // column count

Expand Down Expand Up @@ -179,11 +179,11 @@ impl MySqlConnection {

if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
// more result sets exist, continue to the next one
self.stream.busy = Busy::Result;
*self.stream.waiting.front_mut().unwrap() = Waiting::Result;
break;
}

self.stream.busy = Busy::NotBusy;
self.stream.waiting.pop_front();
return Ok(());
}

Expand Down
2 changes: 1 addition & 1 deletion sqlx-core/src/mysql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod executor;
mod stream;
mod tls;

pub(crate) use stream::{Busy, MySqlStream};
pub(crate) use stream::{MySqlStream, Waiting};

const MAX_PACKET_SIZE: u32 = 1024;

Expand Down
27 changes: 13 additions & 14 deletions sqlx-core/src/mysql/connection/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};

use bytes::{Buf, Bytes};
Expand All @@ -16,15 +17,13 @@ pub struct MySqlStream {
pub(crate) server_version: (u16, u16, u16),
pub(super) capabilities: Capabilities,
pub(crate) sequence_id: u8,
pub(crate) busy: Busy,
pub(crate) waiting: VecDeque<Waiting>,
pub(crate) charset: CharSet,
pub(crate) collation: Collation,
}

#[derive(Debug, PartialEq, Eq)]
pub(crate) enum Busy {
NotBusy,

pub(crate) enum Waiting {
// waiting for a result set
Result,

Expand Down Expand Up @@ -65,7 +64,7 @@ impl MySqlStream {
}

Ok(Self {
busy: Busy::NotBusy,
waiting: VecDeque::new(),
capabilities,
server_version: (0, 0, 0),
sequence_id: 0,
Expand All @@ -80,32 +79,32 @@ impl MySqlStream {
self.stream.flush().await?;
}

while self.busy != Busy::NotBusy {
while self.busy == Busy::Row {
while !self.waiting.is_empty() {
while self.waiting.front() == Some(&Waiting::Row) {
let packet = self.recv_packet().await?;

if packet[0] == 0xfe && packet.len() < 9 {
let eof = packet.eof(self.capabilities)?;

self.busy = if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
Busy::Result
if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
*self.waiting.front_mut().unwrap() = Waiting::Result;
} else {
Busy::NotBusy
self.waiting.pop_front();
};
}
}

while self.busy == Busy::Result {
while self.waiting.front() == Some(&Waiting::Result) {
let packet = self.recv_packet().await?;

if packet[0] == 0x00 || packet[0] == 0xff {
let ok = packet.ok()?;

if !ok.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
self.busy = Busy::NotBusy;
self.waiting.pop_front();
}
} else {
self.busy = Busy::Row;
*self.waiting.front_mut().unwrap() = Waiting::Row;
self.skip_result_metadata(packet).await?;
}
}
Expand Down Expand Up @@ -150,7 +149,7 @@ impl MySqlStream {
// TODO: packet joining

if payload[0] == 0xff {
self.busy = Busy::NotBusy;
self.waiting.pop_front();

// instead of letting this packet be looked at everywhere, we check here
// and emit a proper Error
Expand Down
4 changes: 2 additions & 2 deletions sqlx-core/src/mysql/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures_core::future::BoxFuture;

use crate::error::Error;
use crate::executor::Executor;
use crate::mysql::connection::Busy;
use crate::mysql::connection::Waiting;
use crate::mysql::protocol::text::Query;
use crate::mysql::{MySql, MySqlConnection};
use crate::transaction::{
Expand Down Expand Up @@ -57,7 +57,7 @@ impl TransactionManager for MySqlTransactionManager {
let depth = conn.transaction_depth;

if depth > 0 {
conn.stream.busy = Busy::Result;
conn.stream.waiting.push_back(Waiting::Result);
conn.stream.sequence_id = 0;
conn.stream
.write_packet(Query(&*rollback_ansi_transaction_sql(depth)));
Expand Down
59 changes: 59 additions & 0 deletions tests/mysql/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,62 @@ async fn test_issue_622() -> anyhow::Result<()> {

Ok(())
}

#[sqlx_macros::test]
async fn it_can_work_with_transactions() -> anyhow::Result<()> {
let mut conn = new::<MySql>().await?;
conn.execute("CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY);")
.await?;

// begin .. rollback

let mut tx = conn.begin().await?;
sqlx::query("INSERT INTO users (id) VALUES (?)")
.bind(1_i32)
.execute(&mut tx)
.await?;
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(&mut tx)
.await?;
assert_eq!(count, 1);
tx.rollback().await?;
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(&mut conn)
.await?;
assert_eq!(count, 0);

// begin .. commit

let mut tx = conn.begin().await?;
sqlx::query("INSERT INTO users (id) VALUES (?)")
.bind(1_i32)
.execute(&mut tx)
.await?;
tx.commit().await?;
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(&mut conn)
.await?;
assert_eq!(count, 1);

// begin .. (drop)

{
let mut tx = conn.begin().await?;

sqlx::query("INSERT INTO users (id) VALUES (?)")
.bind(2)
.execute(&mut tx)
.await?;
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(&mut tx)
.await?;
assert_eq!(count, 2);
// tx is dropped
}
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(&mut conn)
.await?;
assert_eq!(count, 1);

Ok(())
}

0 comments on commit 593364f

Please sign in to comment.