From dbdb8485d448620e9a14cb8c0a71672d70a11766 Mon Sep 17 00:00:00 2001 From: hammadb Date: Fri, 8 Mar 2024 08:23:42 -0800 Subject: [PATCH] [ENH] Add Arrow-backed blockfile --- .../blockstore/arrow_blockfile/blockfile.rs | 143 +++++++++++------- rust/worker/src/blockstore/types.rs | 15 +- 2 files changed, 100 insertions(+), 58 deletions(-) diff --git a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs index eabdc148bdaf..006ef83fcb8e 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs @@ -1,15 +1,17 @@ use super::super::types::{Blockfile, BlockfileKey, Key, KeyType, Value, ValueType}; -use super::block::BlockState; +use super::block::{BlockError, BlockState}; use super::provider::ArrowBlockProvider; use super::sparse_index::SparseIndex; use crate::blockstore::arrow_blockfile::block::delta::BlockDelta; +use crate::blockstore::BlockfileError; +use crate::errors::ChromaError; use parking_lot::Mutex; use std::sync::Arc; +use thiserror::Error; use uuid::Uuid; pub(super) const MAX_BLOCK_SIZE: usize = 16384; -// TODO: Think about the clone here #[derive(Clone)] pub(crate) struct ArrowBlockfile { key_type: KeyType, @@ -48,16 +50,36 @@ impl TransactionState { } } +#[derive(Error, Debug)] +pub(crate) enum ArrowBlockfileError { + #[error("Block not found")] + BlockNotFoundError, + #[error("Block Error")] + BlockError(#[from] BlockError), + #[error("No split key found")] + NoSplitKeyFound, +} + +impl ChromaError for ArrowBlockfileError { + fn code(&self) -> crate::errors::ErrorCodes { + match self { + ArrowBlockfileError::BlockNotFoundError => crate::errors::ErrorCodes::NotFound, + ArrowBlockfileError::BlockError(err) => err.code(), + ArrowBlockfileError::NoSplitKeyFound => crate::errors::ErrorCodes::Internal, + } + } +} + impl Blockfile for ArrowBlockfile { - fn get(&self, key: BlockfileKey) -> Result> { + fn get(&self, key: BlockfileKey) -> Result> { let target_block_id = self.sparse_index.lock().get_target_block_id(&key); let target_block = match self.block_provider.get_block(&target_block_id) { - None => panic!("Block not found"), // TODO: This should not panic tbh + None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)), Some(block) => block, }; let value = target_block.get(&key); match value { - None => panic!("Key not found"), // TODO: This should not panic tbh + None => return Err(Box::new(BlockfileError::NotFoundError)), Some(value) => Ok(value), } } @@ -109,24 +131,24 @@ impl Blockfile for ArrowBlockfile { // TODO: value must be smaller than the block size except for position lists, which are a special case // where we split the value across multiple blocks if !self.in_transaction() { - panic!("Transaction not in progress"); + return Err(Box::new(BlockfileError::TransactionNotInProgress)); } // Validate key type match key.key { Key::String(_) => { if self.key_type != KeyType::String { - panic!("Invalid key type"); + return Err(Box::new(BlockfileError::InvalidKeyType)); } } Key::Float(_) => { if self.key_type != KeyType::Float { - panic!("Invalid key type"); + return Err(Box::new(BlockfileError::InvalidKeyType)); } } Key::Bool(_) => { if self.key_type != KeyType::Bool { - panic!("Invalid key type"); + return Err(Box::new(BlockfileError::InvalidKeyType)); } } } @@ -135,33 +157,33 @@ impl Blockfile for ArrowBlockfile { match value { Value::Int32ArrayValue(_) => { if self.value_type != ValueType::Int32Array { - panic!("Invalid value type"); + return Err(Box::new(BlockfileError::InvalidValueType)); } } Value::StringValue(_) => { if self.value_type != ValueType::String { - panic!("Invalid value type"); + return Err(Box::new(BlockfileError::InvalidValueType)); } } Value::Int32Value(_) => { if self.value_type != ValueType::Int32 { - panic!("Invalid value type"); + return Err(Box::new(BlockfileError::InvalidValueType)); } } Value::PositionalPostingListValue(_) => { if self.value_type != ValueType::PositionalPostingList { - panic!("Invalid value type"); + return Err(Box::new(BlockfileError::InvalidValueType)); } } Value::RoaringBitmapValue(_) => { if self.value_type != ValueType::RoaringBitmap { - panic!("Invalid value type"); + return Err(Box::new(BlockfileError::InvalidValueType)); } } } let transaction_state = match &self.transaction_state { - None => panic!("Transaction not in progress"), + None => return Err(Box::new(BlockfileError::TransactionNotInProgress)), Some(transaction_state) => transaction_state, }; @@ -174,7 +196,7 @@ impl Blockfile for ArrowBlockfile { let delta = match transaction_state.get_delta_for_block(&target_block_id) { None => { let target_block = match self.block_provider.get_block(&target_block_id) { - None => panic!("Block not found"), // TODO: This should not panic tbh + None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)), Some(block) => block, }; let delta = BlockDelta::from(target_block); @@ -190,7 +212,7 @@ impl Blockfile for ArrowBlockfile { let (split_key, new_delta) = delta.split(&self.block_provider); match *transaction_sparse_index { None => { - let mut new_sparse_index = + let new_sparse_index = Arc::new(Mutex::new(SparseIndex::from(&self.sparse_index.lock()))); new_sparse_index .lock() @@ -212,8 +234,7 @@ impl Blockfile for ArrowBlockfile { fn begin_transaction(&mut self) -> Result<(), Box> { if self.in_transaction() { - // TODO: return error - panic!("Transaction already in progress"); + return Err(Box::new(BlockfileError::TransactionInProgress)); } self.transaction_state = Some(Arc::new(TransactionState::new())); Ok(()) @@ -221,11 +242,11 @@ impl Blockfile for ArrowBlockfile { fn commit_transaction(&mut self) -> Result<(), Box> { if !self.in_transaction() { - panic!("Transaction not in progress"); + return Err(Box::new(BlockfileError::TransactionNotInProgress)); } - let mut transaction_state = match self.transaction_state { - None => panic!("Transaction not in progress"), + let transaction_state = match self.transaction_state { + None => return Err(Box::new(BlockfileError::TransactionNotInProgress)), Some(ref transaction_state) => transaction_state, }; @@ -235,37 +256,53 @@ impl Blockfile for ArrowBlockfile { match delta.source_block.get_state() { BlockState::Uninitialized => { - delta.source_block.apply_delta(&delta); - delta.source_block.commit(); - println!( - "Size of commited block in bytes: {} with len {}", - delta.source_block.get_size(), - delta.source_block.len() - ); + match delta.source_block.apply_delta(&delta) { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } + match delta.source_block.commit() { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } } BlockState::Initialized => { - delta.source_block.apply_delta(&delta); - delta.source_block.commit(); - println!( - "Size of commited block in bytes: {} with len {}", - delta.source_block.get_size(), - delta.source_block.len() - ); + match delta.source_block.apply_delta(&delta) { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } + match delta.source_block.commit() { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } } BlockState::Commited | BlockState::Registered => { // If the block is commited or registered, we need to create a new block and update the sparse index let new_block = self .block_provider .create_block(self.key_type, self.value_type); - new_block.apply_delta(&delta); + match new_block.apply_delta(&delta) { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } let new_min_key = match delta.get_min_key() { - None => panic!("No start key"), + // This should never happen. We don't panic here because we want to return a proper error + None => return Err(Box::new(ArrowBlockfileError::NoSplitKeyFound)), Some(key) => key, }; let mut transaction_sparse_index = transaction_state.new_sparse_index.lock(); match *transaction_sparse_index { None => { - let mut new_sparse_index = + let new_sparse_index = Arc::new(Mutex::new(SparseIndex::from(&self.sparse_index.lock()))); new_sparse_index.lock().replace_block( delta.source_block.get_id(), @@ -282,12 +319,12 @@ impl Blockfile for ArrowBlockfile { ); } } - new_block.commit(); - println!( - "Size of commited block in bytes: {} with len {}", - new_block.get_size(), - new_block.len() - ); + match new_block.commit() { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } } } } @@ -502,24 +539,16 @@ mod tests { } blockfile.commit_transaction().unwrap(); - // Print size of each block - for block_id in blockfile.sparse_index.lock().block_ids() { - let block = blockfile.block_provider.get_block(block_id).unwrap(); - println!( - "Size of block {} in bytes: {} with len {}", - block_id, - block.get_size(), - block.len() - ); - } - for i in 0..n { let key = BlockfileKey::new("key".to_string(), Key::String(format!("{:04}", i))); let res = blockfile.get(key).unwrap(); match res { Value::RoaringBitmapValue(bitmap) => { assert_eq!(bitmap.len(), i as u64); - // TODO: check it contains the right values + assert_eq!( + bitmap.iter().collect::>(), + (0..i).collect::>() + ); } _ => panic!("Unexpected value type"), } diff --git a/rust/worker/src/blockstore/types.rs b/rust/worker/src/blockstore/types.rs index 2ed1c9699892..e9196da75d45 100644 --- a/rust/worker/src/blockstore/types.rs +++ b/rust/worker/src/blockstore/types.rs @@ -13,12 +13,25 @@ use thiserror::Error; pub(crate) enum BlockfileError { #[error("Key not found")] NotFoundError, + #[error("Invalid Key Type")] + InvalidKeyType, + #[error("Invalid Value Type")] + InvalidValueType, + #[error("Transaction already in progress")] + TransactionInProgress, + #[error("Transaction not in progress")] + TransactionNotInProgress, } impl ChromaError for BlockfileError { fn code(&self) -> ErrorCodes { match self { - BlockfileError::NotFoundError => ErrorCodes::InvalidArgument, + BlockfileError::NotFoundError + | BlockfileError::InvalidKeyType + | BlockfileError::InvalidValueType => ErrorCodes::InvalidArgument, + BlockfileError::TransactionInProgress | BlockfileError::TransactionNotInProgress => { + ErrorCodes::FailedPrecondition + } } } }