Skip to content

Commit

Permalink
[ENH] Add sparse index
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 7, 2024
1 parent 401f43a commit 76dc92c
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 0 deletions.
1 change: 1 addition & 0 deletions rust/worker/src/blockstore/arrow_blockfile/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod block;
mod blockfile;
mod provider;
mod sparse_index;
202 changes: 202 additions & 0 deletions rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use crate::blockstore::types::BlockfileKey;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use uuid::Uuid;

/// A sentinel blockfilekey wrapper to represent the start blocks range
#[derive(Clone, Debug)]
enum SparseIndexDelimiter {
Start,
Key(BlockfileKey),
}

impl PartialEq for SparseIndexDelimiter {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(SparseIndexDelimiter::Start, SparseIndexDelimiter::Start) => true,
(SparseIndexDelimiter::Key(k1), SparseIndexDelimiter::Key(k2)) => k1 == k2,
_ => false,
}
}
}

impl Eq for SparseIndexDelimiter {}

impl PartialOrd for SparseIndexDelimiter {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
match (self, other) {
(SparseIndexDelimiter::Start, SparseIndexDelimiter::Start) => {
Some(std::cmp::Ordering::Equal)
}
(SparseIndexDelimiter::Start, SparseIndexDelimiter::Key(_)) => {
Some(std::cmp::Ordering::Less)
}
(SparseIndexDelimiter::Key(_), SparseIndexDelimiter::Start) => {
Some(std::cmp::Ordering::Greater)
}
(SparseIndexDelimiter::Key(k1), SparseIndexDelimiter::Key(k2)) => k1.partial_cmp(k2),
}
}
}

impl Ord for SparseIndexDelimiter {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (self, other) {
(SparseIndexDelimiter::Start, SparseIndexDelimiter::Start) => std::cmp::Ordering::Equal,
(SparseIndexDelimiter::Start, SparseIndexDelimiter::Key(_)) => std::cmp::Ordering::Less,
(SparseIndexDelimiter::Key(_), SparseIndexDelimiter::Start) => {
std::cmp::Ordering::Greater
}
(SparseIndexDelimiter::Key(k1), SparseIndexDelimiter::Key(k2)) => k1.cmp(k2),
}
}
}

/// A sparse index is used by a Blockfile to map a range of keys to a block id
/// # Methods
/// - `new` - Create a new sparse index with a single block
/// - `from` - Create a new sparse index from an existing one
/// - `get_target_block_id` - Get the block id for a given key
/// - `add_block` - Add a new block to the sparse index
/// - `replace_block` - Replace an existing block with a new one
/// - `len` - Get the number of blocks in the sparse index
/// - `is_valid` - Check if the sparse index is valid, useful for debugging and testing
pub(super) struct SparseIndex {
forward: BTreeMap<SparseIndexDelimiter, Uuid>,
reverse: HashMap<Uuid, SparseIndexDelimiter>,
}

impl SparseIndex {
pub(super) fn new(initial_block_id: Uuid) -> Self {
let mut forward = BTreeMap::new();
forward.insert(SparseIndexDelimiter::Start, initial_block_id);
let mut reverse = HashMap::new();
reverse.insert(initial_block_id, SparseIndexDelimiter::Start);
Self { forward, reverse }
}

pub(super) fn from(old_sparse_index: &SparseIndex) -> Self {
Self {
forward: old_sparse_index.forward.clone(),
reverse: old_sparse_index.reverse.clone(),
}
}

pub(super) fn get_target_block_id(&self, search_key: &BlockfileKey) -> Uuid {
let mut iter_curr = self.forward.iter();
let mut iter_next = self.forward.iter().skip(1);
let search_key = SparseIndexDelimiter::Key(search_key.clone());
while let Some((curr_key, curr_block_id)) = iter_curr.next() {
if let Some((next_key, _)) = iter_next.next() {
if search_key >= *curr_key && search_key < *next_key {
return *curr_block_id;
}
} else {
return *curr_block_id;
}
}
panic!("No blocks in the sparse index");
}

pub(super) fn add_block(&mut self, start_key: BlockfileKey, block_id: Uuid) {
self.forward
.insert(SparseIndexDelimiter::Key(start_key.clone()), block_id);
self.reverse
.insert(block_id, SparseIndexDelimiter::Key(start_key));
}

pub(super) fn replace_block(
&mut self,
old_block_id: Uuid,
new_block_id: Uuid,
new_start_key: BlockfileKey,
) {
if let Some(old_start_key) = self.reverse.remove(&old_block_id) {
self.forward.remove(&old_start_key);
if old_start_key == SparseIndexDelimiter::Start {
self.forward
.insert(SparseIndexDelimiter::Start, new_block_id);
} else {
self.forward
.insert(SparseIndexDelimiter::Key(new_start_key), new_block_id);
}
}
}

pub(super) fn len(&self) -> usize {
self.forward.len()
}

pub(super) fn is_valid(&self) -> bool {
let mut first = true;
let mut iter_curr = self.forward.iter();
let mut iter_next = self.forward.iter().skip(1);
while let Some((curr_key, _)) = iter_curr.next() {
if first {
if curr_key != &SparseIndexDelimiter::Start {
return false;
}
first = false;
}
if let Some((next_key, _)) = iter_next.next() {
if curr_key >= next_key {
return false;
}
}
}
true
}
}

impl Debug for SparseIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SparseIndex {{")?;
for (k, v) in self.forward.iter() {
write!(f, "\n {:?} -> {:?}", k, v)?;
}
write!(f, "\n}}")
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::blockstore::types::Key;

#[test]
fn test_sparse_index() {
let mut block_id_1 = uuid::Uuid::new_v4();
let mut sparse_index = SparseIndex::new(block_id_1);
let mut blockfile_key =
BlockfileKey::new("prefix".to_string(), Key::String("a".to_string()));
sparse_index.add_block(blockfile_key.clone(), block_id_1);
assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_1);

blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("b".to_string()));
assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_1);

// Split the range into two blocks (start, c), and (c, end)
let block_id_2 = uuid::Uuid::new_v4();
blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("c".to_string()));
sparse_index.add_block(blockfile_key.clone(), block_id_2);
assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_2);

// d should fall into the second block
blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("d".to_string()));
assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_2);

// Split the second block into (c, f) and (f, end)
let block_id_3 = uuid::Uuid::new_v4();
blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("f".to_string()));
sparse_index.add_block(blockfile_key.clone(), block_id_3);
assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_3);

// g should fall into the third block
blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("g".to_string()));
assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_3);

// b should fall into the first block
blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("b".to_string()));
assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_1);
}
}

0 comments on commit 76dc92c

Please sign in to comment.