Skip to content

Commit

Permalink
Refine edge case handling: add optional predicate to get_next_record()
Browse files Browse the repository at this point in the history
  • Loading branch information
jussisaurio committed Oct 8, 2024
1 parent 43038cb commit 556f4b7
Showing 1 changed file with 65 additions and 20 deletions.
85 changes: 65 additions & 20 deletions core/storage/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ impl BTreeCursor {
Ok(CursorResult::Ok(page.cell_count() == 0))
}

fn get_next_record(&mut self) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
fn get_next_record<'a>(
&mut self,
predicate: Option<(SeekKey<'a>, SeekOp)>,
) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
loop {
let mem_page = self.get_mem_page();
let page_idx = mem_page.page_idx;
Expand Down Expand Up @@ -134,6 +137,7 @@ impl BTreeCursor {
_left_child_page,
_rowid,
}) => {
assert!(predicate.is_none());
mem_page.advance();
let mem_page =
MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0);
Expand All @@ -145,6 +149,7 @@ impl BTreeCursor {
_payload,
first_overflow_page: _,
}) => {
assert!(predicate.is_none());
mem_page.advance();
let record = crate::storage::sqlite3_ondisk::read_record(_payload)?;
return Ok(CursorResult::Ok((Some(*_rowid), Some(record))));
Expand All @@ -154,31 +159,72 @@ impl BTreeCursor {
left_child_page,
..
}) => {
if self.going_upwards {
self.going_upwards = false;
mem_page.advance();
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
if !self.going_upwards {
let mem_page =
MemPage::new(Some(mem_page.clone()), *left_child_page as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
continue;
}

self.going_upwards = false;
mem_page.advance();

let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
if predicate.is_none() {
let rowid = match record.values.last() {
Some(OwnedValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
};
return Ok(CursorResult::Ok((Some(rowid), Some(record))));
}

let (key, op) = predicate.as_ref().unwrap();
let SeekKey::IndexKey(index_key) = key else {
unreachable!("index seek key should be a record");
};
let found = match op {
SeekOp::GT => &record > *index_key,
SeekOp::GE => &record >= *index_key,
SeekOp::EQ => &record == *index_key,
};
if found {
let rowid = match record.values.last() {
Some(OwnedValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
};
return Ok(CursorResult::Ok((Some(rowid), Some(record))));
} else {
let mem_page =
MemPage::new(Some(mem_page.clone()), *left_child_page as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
continue;
}
}
BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => {
mem_page.advance();
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
let rowid = match record.values.last() {
Some(OwnedValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
if predicate.is_none() {
let rowid = match record.values.last() {
Some(OwnedValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
};
return Ok(CursorResult::Ok((Some(rowid), Some(record))));
}
let (key, op) = predicate.as_ref().unwrap();
let SeekKey::IndexKey(index_key) = key else {
unreachable!("index seek key should be a record");
};

return Ok(CursorResult::Ok((Some(rowid), Some(record))));
let found = match op {
SeekOp::GT => &record > *index_key,
SeekOp::GE => &record >= *index_key,
SeekOp::EQ => &record == *index_key,
};
if found {
let rowid = match record.values.last() {
Some(OwnedValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
};
return Ok(CursorResult::Ok((Some(rowid), Some(record))));
} else {
continue;
}
}
}
}
Expand Down Expand Up @@ -273,10 +319,9 @@ impl BTreeCursor {
//
// In the seek() method, once we have landed in the leaf page and find that there is no cell with a key greater than K,
// if we were to return Ok(CursorResult::Ok((None, None))), self.record would be None, which is incorrect, because we already know
// that there is a record with a key greater than K (K' = K+2) in the parent interior cell. Hence, we need to move back up to the parent
// interior cell and get the next record from there. self.get_next_record() does that automatically by setting self.going_upwards to true
// and then obtaining the next record from the parent interior cell.
return self.get_next_record();
// that there is a record with a key greater than K (K' = K+2) in the parent interior cell. Hence, we need to move back up the tree
// and get the next matching record from there.
return self.get_next_record(Some((key, op)));
}

Ok(CursorResult::Ok((None, None)))
Expand Down Expand Up @@ -1344,7 +1389,7 @@ fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount
impl Cursor for BTreeCursor {
fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
self.move_to_rightmost()?;
match self.get_next_record()? {
match self.get_next_record(None)? {
CursorResult::Ok((rowid, next)) => {
if rowid.is_none() {
match self.is_empty_table()? {
Expand All @@ -1369,7 +1414,7 @@ impl Cursor for BTreeCursor {
fn rewind(&mut self) -> Result<CursorResult<()>> {
let mem_page = MemPage::new(None, self.root_page, 0);
self.page.replace(Some(Rc::new(mem_page)));
match self.get_next_record()? {
match self.get_next_record(None)? {
CursorResult::Ok((rowid, next)) => {
self.rowid.replace(rowid);
self.record.replace(next);
Expand All @@ -1380,7 +1425,7 @@ impl Cursor for BTreeCursor {
}

fn next(&mut self) -> Result<CursorResult<()>> {
match self.get_next_record()? {
match self.get_next_record(None)? {
CursorResult::Ok((rowid, next)) => {
self.rowid.replace(rowid);
self.record.replace(next);
Expand Down

0 comments on commit 556f4b7

Please sign in to comment.