diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs b/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs index 676cdbd45f9..52c0ba48daf 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs @@ -201,6 +201,7 @@ impl BlockDeltaInner { self.new_data.iter().fold(0, |acc, (_, value)| match value { Value::Int32ArrayValue(arr) => acc + arr.len(), Value::StringValue(s) => acc + s.len(), + Value::RoaringBitmapValue(bitmap) => acc + bitmap.serialized_size(), _ => unimplemented!("Value type not implemented"), }) } @@ -225,12 +226,16 @@ impl BlockDeltaInner { bit_util::round_upto_multiple_of_64(value_data_size) + value_offset_size; let total_future_size = prefix_total_bytes + key_total_bytes + value_total_bytes; + if total_future_size > MAX_BLOCK_SIZE { + return false; + } + total_future_size <= MAX_BLOCK_SIZE } fn offset_size_for_value_type(&self, item_count: usize, value_type: ValueType) -> usize { match value_type { - ValueType::Int32Array | ValueType::String => { + ValueType::Int32Array | ValueType::String | ValueType::RoaringBitmap => { bit_util::round_upto_multiple_of_64((item_count + 1) * 4) } _ => unimplemented!("Value type not implemented"), @@ -262,9 +267,11 @@ impl BlockDeltaInner { let mut running_key_size = 0; let mut running_value_size = 0; let mut running_count = 0; - let mut split_key = None; + // The split key will be the last key that pushes the block over the half size. Not the first key that pushes it over - for (key, value) in self.new_data.iter() { + let mut split_key = None; + let mut iter = self.new_data.iter(); + while let Some((key, value)) = iter.next() { running_prefix_size += key.get_prefix_size(); running_key_size += key.key.get_size(); running_value_size += value.get_size(); @@ -277,10 +284,14 @@ impl BlockDeltaInner { key_type, value_type, ); - if half_size < current_size { + if current_size > half_size { + let next = iter.next(); + match next { + Some((next_key, _)) => split_key = Some(next_key.clone()), + None => split_key = Some(key.clone()), + } break; } - split_key = Some(key.clone()); } match &split_key { @@ -399,4 +410,26 @@ mod test { let block_data = BlockData::try_from(&delta).unwrap(); assert_eq!(size, block_data.get_size()); } + + #[test] + fn test_sizing_roaring_bitmap_val() { + let block_provider = ArrowBlockProvider::new(); + let block = block_provider.create_block(KeyType::String, ValueType::RoaringBitmap); + let delta = BlockDelta::from(block.clone()); + + let n = 2000; + for i in 0..n { + let key = BlockfileKey::new("key".to_string(), Key::String(format!("{:04}", i))); + let value = Value::RoaringBitmapValue(roaring::RoaringBitmap::from_iter( + (0..i).map(|x| x as u32), + )); + delta.add(key, value); + } + + let size = delta.get_size(); + let block_data = BlockData::try_from(&delta).unwrap(); + assert_eq!(size, block_data.get_size()); + + let (split_key, delta) = delta.split(&block_provider); + } } diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs b/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs index 4ac656dc86b..d867bd65040 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs @@ -1,6 +1,6 @@ -mod delta; mod iterator; mod types; +pub(in crate::blockstore::arrow_blockfile) mod delta; // Re-export types at the arrow_blockfile module level pub(in crate::blockstore::arrow_blockfile) use types::*; diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/types.rs b/rust/worker/src/blockstore/arrow_blockfile/block/types.rs index da6379735b9..9c9d4f5ee1e 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/block/types.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/block/types.rs @@ -1,12 +1,16 @@ use crate::blockstore::types::{BlockfileKey, Key, KeyType, Value, ValueType}; use crate::errors::{ChromaError, ErrorCodes}; -use arrow::array::{BooleanArray, BooleanBuilder, Float32Array, Float32Builder}; +use arrow::array::{ + BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, Float32Array, Float32Builder, + GenericByteBuilder, +}; use arrow::{ array::{Array, Int32Array, Int32Builder, ListArray, ListBuilder, StringArray, StringBuilder}, datatypes::{DataType, Field}, record_batch::RecordBatch, }; use parking_lot::RwLock; +use std::io::Error; use std::sync::Arc; use thiserror::Error; use uuid::Uuid; @@ -147,6 +151,21 @@ impl Block { .to_string(), )) } + ValueType::RoaringBitmap => { + let bytes = value + .as_any() + .downcast_ref::() + .unwrap() + .value(i); + let bitmap = roaring::RoaringBitmap::deserialize_from(bytes); + match bitmap { + Ok(bitmap) => { + return Some(Value::RoaringBitmapValue(bitmap)) + } + // TODO: log error + Err(_) => return None, + } + } // TODO: Add support for other types _ => unimplemented!(), } @@ -271,6 +290,7 @@ enum KeyBuilder { enum ValueBuilder { Int32ArrayValueBuilder(ListBuilder), StringValueBuilder(StringBuilder), + RoaringBitmapBuilder(BinaryBuilder), } /// BlockDataBuilder is used to build a block. It is used to add data to a block and then build the BlockData once all data has been added. @@ -359,6 +379,9 @@ impl BlockDataBuilder { options.item_count, options.total_value_capacity, )), + ValueType::RoaringBitmap => ValueBuilder::RoaringBitmapBuilder( + BinaryBuilder::with_capacity(options.item_count, options.total_value_capacity), + ), // TODO: Implement the other value types _ => unimplemented!(), }; @@ -420,6 +443,18 @@ impl BlockDataBuilder { } _ => unreachable!("Invalid value type for block"), }, + ValueBuilder::RoaringBitmapBuilder(ref mut builder) => match value { + Value::RoaringBitmapValue(bitmap) => { + let mut bytes = Vec::with_capacity(bitmap.serialized_size()); + match bitmap.serialize_into(&mut bytes) { + Ok(_) => builder.append_value(&bytes), + Err(e) => { + return Err(Box::new(BlockDataAddError::RoaringBitmapError(e))); + } + } + } + _ => unreachable!("Invalid value type for block"), + }, } Ok(()) @@ -464,6 +499,11 @@ impl BlockDataBuilder { let arr = builder.finish(); (&arr as &dyn Array).slice(0, arr.len()) } + ValueBuilder::RoaringBitmapBuilder(ref mut builder) => { + value_field = Field::new("value", DataType::Binary, true); + let arr = builder.finish(); + (&arr as &dyn Array).slice(0, arr.len()) + } }; let schema = Arc::new(arrow::datatypes::Schema::new(vec![ @@ -484,12 +524,15 @@ impl BlockDataBuilder { pub enum BlockDataAddError { #[error("Blockfile key not in order")] KeyNotInOrder, + #[error("Roaring bitmap error")] + RoaringBitmapError(#[from] Error), } impl ChromaError for BlockDataAddError { fn code(&self) -> ErrorCodes { match self { BlockDataAddError::KeyNotInOrder => ErrorCodes::InvalidArgument, + BlockDataAddError::RoaringBitmapError(_) => ErrorCodes::Internal, } } } diff --git a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs index 507455e2e2c..6a5c2fc296d 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs @@ -1 +1,573 @@ +use super::super::types::{Blockfile, BlockfileKey, Key, KeyType, Value, ValueType}; +use super::block::{BlockError, BlockState}; +use super::provider::ArrowBlockProvider; +use super::sparse_index::SparseIndex; +use crate::blockstore::arrow_blockfile::block::delta::BlockDelta; +use crate::blockstore::BlockfileError; +use crate::errors::ChromaError; +use parking_lot::Mutex; +use std::sync::Arc; +use thiserror::Error; +use uuid::Uuid; + pub(super) const MAX_BLOCK_SIZE: usize = 16384; + +/// ArrowBlockfile is a blockfile implementation that uses Apache Arrow for the block storage. +/// It stores a sparse index over a set of blocks sorted by key. +/// It uses a block provider to create new blocks and to retrieve existing blocks. +#[derive(Clone)] +pub(crate) struct ArrowBlockfile { + key_type: KeyType, + value_type: ValueType, + block_provider: ArrowBlockProvider, + sparse_index: Arc>, + transaction_state: Option>, +} + +/// TransactionState is a helper struct to keep track of the state of a transaction. +/// It keeps a list of block deltas that are applied during the transaction and the new +/// sparse index that is created during the transaction. The sparse index is immutable +/// so we can replace the sparse index of the blockfile with the new sparse index after +/// the transaction is committed. +struct TransactionState { + block_delta: Mutex>, + sparse_index: Mutex>>>, +} + +impl TransactionState { + fn new() -> Self { + Self { + block_delta: Mutex::new(Vec::new()), + sparse_index: Mutex::new(None), + } + } + + /// Add a new block delta to the transaction state + fn add_delta(&self, delta: BlockDelta) { + let mut block_delta = self.block_delta.lock(); + block_delta.push(delta); + } + + /// Get the block delta for a specific block id + fn get_delta_for_block(&self, search_id: &Uuid) -> Option { + let block_delta = self.block_delta.lock(); + for delta in &*block_delta { + if delta.source_block.get_id() == *search_id { + return Some(delta.clone()); + } + } + None + } +} + +#[derive(Error, Debug)] +pub(crate) enum ArrowBlockfileError { + #[error("Block not found")] + BlockNotFoundError, + #[error("Block Error")] + BlockError(#[from] BlockError), + #[error("No split key found")] + NoSplitKeyFound, +} + +impl ChromaError for ArrowBlockfileError { + fn code(&self) -> crate::errors::ErrorCodes { + match self { + ArrowBlockfileError::BlockNotFoundError => crate::errors::ErrorCodes::NotFound, + ArrowBlockfileError::BlockError(err) => err.code(), + ArrowBlockfileError::NoSplitKeyFound => crate::errors::ErrorCodes::Internal, + } + } +} + +impl Blockfile for ArrowBlockfile { + fn get(&self, key: BlockfileKey) -> Result> { + let target_block_id = self.sparse_index.lock().get_target_block_id(&key); + let target_block = match self.block_provider.get_block(&target_block_id) { + None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)), + Some(block) => block, + }; + let value = target_block.get(&key); + match value { + None => return Err(Box::new(BlockfileError::NotFoundError)), + Some(value) => Ok(value), + } + } + + fn get_by_prefix( + &self, + prefix: String, + ) -> Result, Box> { + unimplemented!(); + } + + fn get_gt( + &self, + prefix: String, + key: Key, + ) -> Result, Box> { + unimplemented!(); + } + + fn get_gte( + &self, + prefix: String, + key: Key, + ) -> Result, Box> { + unimplemented!(); + } + + fn get_lt( + &self, + prefix: String, + key: Key, + ) -> Result, Box> { + unimplemented!(); + } + + fn get_lte( + &self, + prefix: String, + key: Key, + ) -> Result, Box> { + unimplemented!(); + } + + fn set( + &mut self, + key: BlockfileKey, + value: Value, + ) -> Result<(), Box> { + // TODO: value must be smaller than the block size except for position lists, which are a special case + // where we split the value across multiple blocks + if !self.in_transaction() { + return Err(Box::new(BlockfileError::TransactionNotInProgress)); + } + + // Validate key type + match key.key { + Key::String(_) => { + if self.key_type != KeyType::String { + return Err(Box::new(BlockfileError::InvalidKeyType)); + } + } + Key::Float(_) => { + if self.key_type != KeyType::Float { + return Err(Box::new(BlockfileError::InvalidKeyType)); + } + } + Key::Bool(_) => { + if self.key_type != KeyType::Bool { + return Err(Box::new(BlockfileError::InvalidKeyType)); + } + } + } + + // Validate value type + match value { + Value::Int32ArrayValue(_) => { + if self.value_type != ValueType::Int32Array { + return Err(Box::new(BlockfileError::InvalidValueType)); + } + } + Value::StringValue(_) => { + if self.value_type != ValueType::String { + return Err(Box::new(BlockfileError::InvalidValueType)); + } + } + Value::Int32Value(_) => { + if self.value_type != ValueType::Int32 { + return Err(Box::new(BlockfileError::InvalidValueType)); + } + } + Value::PositionalPostingListValue(_) => { + if self.value_type != ValueType::PositionalPostingList { + return Err(Box::new(BlockfileError::InvalidValueType)); + } + } + Value::RoaringBitmapValue(_) => { + if self.value_type != ValueType::RoaringBitmap { + return Err(Box::new(BlockfileError::InvalidValueType)); + } + } + } + + let transaction_state = match &self.transaction_state { + None => return Err(Box::new(BlockfileError::TransactionNotInProgress)), + Some(transaction_state) => transaction_state, + }; + + // Get the target block id for the key + let mut transaction_sparse_index = transaction_state.sparse_index.lock(); + let target_block_id = match *transaction_sparse_index { + None => self.sparse_index.lock().get_target_block_id(&key), + Some(ref index) => index.lock().get_target_block_id(&key), + }; + + // See if a delta for the target block already exists, if not create a new one and add it to the transaction state + // Creating a delta loads the block entirely into memory + let delta = match transaction_state.get_delta_for_block(&target_block_id) { + None => { + let target_block = match self.block_provider.get_block(&target_block_id) { + None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)), + Some(block) => block, + }; + let delta = BlockDelta::from(target_block); + transaction_state.add_delta(delta.clone()); + delta + } + Some(delta) => delta, + }; + + // Check if we can add to the the delta without pushing the block over the max size. + // If we can't, we need to split the block and create a new delta + if delta.can_add(&key, &value) { + delta.add(key, value); + } else { + let (split_key, new_delta) = delta.split(&self.block_provider); + match *transaction_sparse_index { + None => { + let new_sparse_index = + Arc::new(Mutex::new(SparseIndex::from(&self.sparse_index.lock()))); + new_sparse_index + .lock() + .add_block(split_key, new_delta.source_block.get_id()); + *transaction_sparse_index = Some(new_sparse_index); + } + Some(ref index) => { + index + .lock() + .add_block(split_key, new_delta.source_block.get_id()); + } + } + transaction_state.add_delta(new_delta); + drop(transaction_sparse_index); + // Recursive call to add to the new appropriate delta + self.set(key, value)? + } + Ok(()) + } + + fn begin_transaction(&mut self) -> Result<(), Box> { + if self.in_transaction() { + return Err(Box::new(BlockfileError::TransactionInProgress)); + } + self.transaction_state = Some(Arc::new(TransactionState::new())); + Ok(()) + } + + fn commit_transaction(&mut self) -> Result<(), Box> { + if !self.in_transaction() { + return Err(Box::new(BlockfileError::TransactionNotInProgress)); + } + + let transaction_state = match self.transaction_state { + None => return Err(Box::new(BlockfileError::TransactionNotInProgress)), + Some(ref transaction_state) => transaction_state, + }; + + for delta in &*transaction_state.block_delta.lock() { + // Blocks are WORM, so if the block is uninitialized or initialized we can update it directly, if its registered, meaning the broader system is aware of it, + // we need to create a new block and update the sparse index to point to the new block + + match delta.source_block.get_state() { + BlockState::Uninitialized => { + match delta.source_block.apply_delta(&delta) { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } + match delta.source_block.commit() { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } + } + BlockState::Initialized => { + match delta.source_block.apply_delta(&delta) { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } + match delta.source_block.commit() { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } + } + BlockState::Commited | BlockState::Registered => { + // If the block is commited or registered, we need to create a new block and update the sparse index + let new_block = self + .block_provider + .create_block(self.key_type, self.value_type); + match new_block.apply_delta(&delta) { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } + let new_min_key = match delta.get_min_key() { + // This should never happen. We don't panic here because we want to return a proper error + None => return Err(Box::new(ArrowBlockfileError::NoSplitKeyFound)), + Some(key) => key, + }; + let mut transaction_sparse_index = transaction_state.sparse_index.lock(); + match *transaction_sparse_index { + None => { + let new_sparse_index = + Arc::new(Mutex::new(SparseIndex::from(&self.sparse_index.lock()))); + new_sparse_index.lock().replace_block( + delta.source_block.get_id(), + new_block.get_id(), + new_min_key, + ); + *transaction_sparse_index = Some(new_sparse_index); + } + Some(ref index) => { + index.lock().replace_block( + delta.source_block.get_id(), + new_block.get_id(), + new_min_key, + ); + } + } + match new_block.commit() { + Ok(_) => {} + Err(err) => { + return Err(Box::new(ArrowBlockfileError::BlockError(*err))); + } + } + } + } + } + + // update the sparse index + let mut transaction_state_sparse_index = transaction_state.sparse_index.lock(); + if transaction_state_sparse_index.is_some() { + self.sparse_index = transaction_state_sparse_index.take().unwrap(); + // unwrap is safe because we just checked it + } + + // Reset the transaction state + drop(transaction_state_sparse_index); + self.transaction_state = None; + Ok(()) + } +} + +impl ArrowBlockfile { + pub(super) fn new( + key_type: KeyType, + value_type: ValueType, + block_provider: ArrowBlockProvider, + ) -> Self { + let initial_block = block_provider.create_block(key_type.clone(), value_type.clone()); + Self { + sparse_index: Arc::new(Mutex::new(SparseIndex::new(initial_block.get_id()))), + transaction_state: None, + block_provider, + key_type, + value_type, + } + } + + fn in_transaction(&self) -> bool { + self.transaction_state.is_some() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int32Array; + + #[test] + fn test_blockfile() { + let block_provider = ArrowBlockProvider::new(); + let mut blockfile = + ArrowBlockfile::new(KeyType::String, ValueType::Int32Array, block_provider); + + blockfile.begin_transaction().unwrap(); + let key1 = BlockfileKey::new("key".to_string(), Key::String("zzzz".to_string())); + blockfile + .set( + key1.clone(), + Value::Int32ArrayValue(Int32Array::from(vec![1, 2, 3])), + ) + .unwrap(); + let key2 = BlockfileKey::new("key".to_string(), Key::String("aaaa".to_string())); + blockfile + .set( + key2, + Value::Int32ArrayValue(Int32Array::from(vec![4, 5, 6])), + ) + .unwrap(); + blockfile.commit_transaction().unwrap(); + + let value = blockfile.get(key1).unwrap(); + match value { + Value::Int32ArrayValue(array) => { + assert_eq!(array.values(), &[1, 2, 3]); + } + _ => panic!("Unexpected value type"), + } + } + + #[test] + fn test_splitting() { + let block_provider = ArrowBlockProvider::new(); + let mut blockfile = + ArrowBlockfile::new(KeyType::String, ValueType::Int32Array, block_provider); + + blockfile.begin_transaction().unwrap(); + let n = 1200; + for i in 0..n { + let string_key = format!("{:04}", i); + let key = BlockfileKey::new("key".to_string(), Key::String(string_key)); + blockfile + .set(key, Value::Int32ArrayValue(Int32Array::from(vec![i]))) + .unwrap(); + } + blockfile.commit_transaction().unwrap(); + + for i in 0..n { + let string_key = format!("{:04}", i); + let key = BlockfileKey::new("key".to_string(), Key::String(string_key)); + let res = blockfile.get(key).unwrap(); + match res { + Value::Int32ArrayValue(array) => { + assert_eq!(array.values(), &[i]); + } + _ => panic!("Unexpected value type"), + } + } + + // Sparse index should have 3 blocks + assert_eq!(blockfile.sparse_index.lock().len(), 3); + assert!(blockfile.sparse_index.lock().is_valid()); + + // Add 5 new entries to the first block + blockfile.begin_transaction().unwrap(); + for i in 0..5 { + let new_key = format! {"{:05}", i}; + let key = BlockfileKey::new("key".to_string(), Key::String(new_key)); + blockfile + .set(key, Value::Int32ArrayValue(Int32Array::from(vec![i]))) + .unwrap(); + } + blockfile.commit_transaction().unwrap(); + + // Sparse index should still have 3 blocks + assert_eq!(blockfile.sparse_index.lock().len(), 3); + assert!(blockfile.sparse_index.lock().is_valid()); + + // Add 1200 more entries, causing splits + blockfile.begin_transaction().unwrap(); + for i in n..n * 2 { + let new_key = format! {"{:04}", i}; + let key = BlockfileKey::new("key".to_string(), Key::String(new_key)); + blockfile + .set(key, Value::Int32ArrayValue(Int32Array::from(vec![i]))) + .unwrap(); + } + blockfile.commit_transaction().unwrap(); + } + + #[test] + fn test_string_value() { + let block_provider = ArrowBlockProvider::new(); + let mut blockfile = ArrowBlockfile::new(KeyType::String, ValueType::String, block_provider); + + blockfile.begin_transaction().unwrap(); + let n = 2000; + + for i in 0..n { + let string_key = format!("{:04}", i); + let key = BlockfileKey::new("key".to_string(), Key::String(string_key.clone())); + blockfile + .set(key, Value::StringValue(string_key.clone())) + .unwrap(); + } + blockfile.commit_transaction().unwrap(); + + for i in 0..n { + let string_key = format!("{:04}", i); + let key = BlockfileKey::new("key".to_string(), Key::String(string_key.clone())); + let res = blockfile.get(key).unwrap(); + match res { + Value::StringValue(string) => { + assert_eq!(string, string_key); + } + _ => panic!("Unexpected value type"), + } + } + } + + #[test] + fn test_int_key() { + let block_provider = ArrowBlockProvider::new(); + let mut blockfile = ArrowBlockfile::new(KeyType::Float, ValueType::String, block_provider); + + blockfile.begin_transaction().unwrap(); + let n = 2000; + for i in 0..n { + let key = BlockfileKey::new("key".to_string(), Key::Float(i as f32)); + blockfile + .set(key, Value::StringValue(format!("{:04}", i))) + .unwrap(); + } + blockfile.commit_transaction().unwrap(); + + for i in 0..n { + let key = BlockfileKey::new("key".to_string(), Key::Float(i as f32)); + let res = blockfile.get(key).unwrap(); + match res { + Value::StringValue(string) => { + assert_eq!(string, format!("{:04}", i)); + } + _ => panic!("Unexpected value type"), + } + } + } + + #[test] + fn test_roaring_bitmap_value() { + let block_provider = ArrowBlockProvider::new(); + let mut blockfile = + ArrowBlockfile::new(KeyType::String, ValueType::RoaringBitmap, block_provider); + + blockfile.begin_transaction().unwrap(); + let n = 2000; + for i in 0..n { + let key = BlockfileKey::new("key".to_string(), Key::String(format!("{:04}", i))); + blockfile + .set( + key, + Value::RoaringBitmapValue(roaring::RoaringBitmap::from_iter( + (0..i).map(|x| x as u32), + )), + ) + .unwrap(); + } + blockfile.commit_transaction().unwrap(); + + for i in 0..n { + let key = BlockfileKey::new("key".to_string(), Key::String(format!("{:04}", i))); + let res = blockfile.get(key).unwrap(); + match res { + Value::RoaringBitmapValue(bitmap) => { + assert_eq!(bitmap.len(), i as u64); + assert_eq!( + bitmap.iter().collect::>(), + (0..i).collect::>() + ); + } + _ => panic!("Unexpected value type"), + } + } + } +} diff --git a/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs b/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs index 42acf6d98ec..134252614ec 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs @@ -152,6 +152,11 @@ impl SparseIndex { } true } + + /// An iterator over the block uuids in the sparse index + pub(super) fn block_ids(&self) -> impl Iterator { + self.forward.values() + } } impl Debug for SparseIndex { diff --git a/rust/worker/src/blockstore/types.rs b/rust/worker/src/blockstore/types.rs index ca240999ebb..fd1f906e85d 100644 --- a/rust/worker/src/blockstore/types.rs +++ b/rust/worker/src/blockstore/types.rs @@ -13,12 +13,25 @@ use thiserror::Error; pub(crate) enum BlockfileError { #[error("Key not found")] NotFoundError, + #[error("Invalid Key Type")] + InvalidKeyType, + #[error("Invalid Value Type")] + InvalidValueType, + #[error("Transaction already in progress")] + TransactionInProgress, + #[error("Transaction not in progress")] + TransactionNotInProgress, } impl ChromaError for BlockfileError { fn code(&self) -> ErrorCodes { match self { - BlockfileError::NotFoundError => ErrorCodes::InvalidArgument, + BlockfileError::NotFoundError + | BlockfileError::InvalidKeyType + | BlockfileError::InvalidValueType => ErrorCodes::InvalidArgument, + BlockfileError::TransactionInProgress | BlockfileError::TransactionNotInProgress => { + ErrorCodes::FailedPrecondition + } } } } @@ -199,7 +212,7 @@ impl Value { unimplemented!("Size of positional posting list") } Value::StringValue(s) => s.len(), - Value::RoaringBitmapValue(bitmap) => unimplemented!("Size of roaring bitmap"), + Value::RoaringBitmapValue(bitmap) => bitmap.serialized_size(), Value::Int32Value(_) => 4, } }