Skip to content

Commit

Permalink
[ENH] Add Arrow-backed blockfile (chroma-core#1846)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Adds roaring bitmap value types, builders etc
 - New functionality
- Adds the base of the blockfile implementation for the arrow-backed
blockfile.

## Test plan
*How are these changes tested?*
Basic
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
HammadB authored and atroyn committed Apr 3, 2024
1 parent 36731e9 commit 609b46e
Show file tree
Hide file tree
Showing 6 changed files with 675 additions and 9 deletions.
43 changes: 38 additions & 5 deletions rust/worker/src/blockstore/arrow_blockfile/block/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl BlockDeltaInner {
self.new_data.iter().fold(0, |acc, (_, value)| match value {
Value::Int32ArrayValue(arr) => acc + arr.len(),
Value::StringValue(s) => acc + s.len(),
Value::RoaringBitmapValue(bitmap) => acc + bitmap.serialized_size(),
_ => unimplemented!("Value type not implemented"),
})
}
Expand All @@ -225,12 +226,16 @@ impl BlockDeltaInner {
bit_util::round_upto_multiple_of_64(value_data_size) + value_offset_size;
let total_future_size = prefix_total_bytes + key_total_bytes + value_total_bytes;

if total_future_size > MAX_BLOCK_SIZE {
return false;
}

total_future_size <= MAX_BLOCK_SIZE
}

fn offset_size_for_value_type(&self, item_count: usize, value_type: ValueType) -> usize {
match value_type {
ValueType::Int32Array | ValueType::String => {
ValueType::Int32Array | ValueType::String | ValueType::RoaringBitmap => {
bit_util::round_upto_multiple_of_64((item_count + 1) * 4)
}
_ => unimplemented!("Value type not implemented"),
Expand Down Expand Up @@ -262,9 +267,11 @@ impl BlockDeltaInner {
let mut running_key_size = 0;
let mut running_value_size = 0;
let mut running_count = 0;
let mut split_key = None;

// The split key will be the last key that pushes the block over the half size. Not the first key that pushes it over
for (key, value) in self.new_data.iter() {
let mut split_key = None;
let mut iter = self.new_data.iter();
while let Some((key, value)) = iter.next() {
running_prefix_size += key.get_prefix_size();
running_key_size += key.key.get_size();
running_value_size += value.get_size();
Expand All @@ -277,10 +284,14 @@ impl BlockDeltaInner {
key_type,
value_type,
);
if half_size < current_size {
if current_size > half_size {
let next = iter.next();
match next {
Some((next_key, _)) => split_key = Some(next_key.clone()),
None => split_key = Some(key.clone()),
}
break;
}
split_key = Some(key.clone());
}

match &split_key {
Expand Down Expand Up @@ -399,4 +410,26 @@ mod test {
let block_data = BlockData::try_from(&delta).unwrap();
assert_eq!(size, block_data.get_size());
}

#[test]
fn test_sizing_roaring_bitmap_val() {
let block_provider = ArrowBlockProvider::new();
let block = block_provider.create_block(KeyType::String, ValueType::RoaringBitmap);
let delta = BlockDelta::from(block.clone());

let n = 2000;
for i in 0..n {
let key = BlockfileKey::new("key".to_string(), Key::String(format!("{:04}", i)));
let value = Value::RoaringBitmapValue(roaring::RoaringBitmap::from_iter(
(0..i).map(|x| x as u32),
));
delta.add(key, value);
}

let size = delta.get_size();
let block_data = BlockData::try_from(&delta).unwrap();
assert_eq!(size, block_data.get_size());

let (split_key, delta) = delta.split(&block_provider);
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/blockstore/arrow_blockfile/block/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod delta;
mod iterator;
mod types;

pub(in crate::blockstore::arrow_blockfile) mod delta;
// Re-export types at the arrow_blockfile module level
pub(in crate::blockstore::arrow_blockfile) use types::*;
45 changes: 44 additions & 1 deletion rust/worker/src/blockstore/arrow_blockfile/block/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::blockstore::types::{BlockfileKey, Key, KeyType, Value, ValueType};
use crate::errors::{ChromaError, ErrorCodes};
use arrow::array::{BooleanArray, BooleanBuilder, Float32Array, Float32Builder};
use arrow::array::{
BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, Float32Array, Float32Builder,
GenericByteBuilder,
};
use arrow::{
array::{Array, Int32Array, Int32Builder, ListArray, ListBuilder, StringArray, StringBuilder},
datatypes::{DataType, Field},
record_batch::RecordBatch,
};
use parking_lot::RwLock;
use std::io::Error;
use std::sync::Arc;
use thiserror::Error;
use uuid::Uuid;
Expand Down Expand Up @@ -147,6 +151,21 @@ impl Block {
.to_string(),
))
}
ValueType::RoaringBitmap => {
let bytes = value
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(i);
let bitmap = roaring::RoaringBitmap::deserialize_from(bytes);
match bitmap {
Ok(bitmap) => {
return Some(Value::RoaringBitmapValue(bitmap))
}
// TODO: log error
Err(_) => return None,
}
}
// TODO: Add support for other types
_ => unimplemented!(),
}
Expand Down Expand Up @@ -271,6 +290,7 @@ enum KeyBuilder {
enum ValueBuilder {
Int32ArrayValueBuilder(ListBuilder<Int32Builder>),
StringValueBuilder(StringBuilder),
RoaringBitmapBuilder(BinaryBuilder),
}

/// BlockDataBuilder is used to build a block. It is used to add data to a block and then build the BlockData once all data has been added.
Expand Down Expand Up @@ -359,6 +379,9 @@ impl BlockDataBuilder {
options.item_count,
options.total_value_capacity,
)),
ValueType::RoaringBitmap => ValueBuilder::RoaringBitmapBuilder(
BinaryBuilder::with_capacity(options.item_count, options.total_value_capacity),
),
// TODO: Implement the other value types
_ => unimplemented!(),
};
Expand Down Expand Up @@ -420,6 +443,18 @@ impl BlockDataBuilder {
}
_ => unreachable!("Invalid value type for block"),
},
ValueBuilder::RoaringBitmapBuilder(ref mut builder) => match value {
Value::RoaringBitmapValue(bitmap) => {
let mut bytes = Vec::with_capacity(bitmap.serialized_size());
match bitmap.serialize_into(&mut bytes) {
Ok(_) => builder.append_value(&bytes),
Err(e) => {
return Err(Box::new(BlockDataAddError::RoaringBitmapError(e)));
}
}
}
_ => unreachable!("Invalid value type for block"),
},
}

Ok(())
Expand Down Expand Up @@ -464,6 +499,11 @@ impl BlockDataBuilder {
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
ValueBuilder::RoaringBitmapBuilder(ref mut builder) => {
value_field = Field::new("value", DataType::Binary, true);
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
};

let schema = Arc::new(arrow::datatypes::Schema::new(vec![
Expand All @@ -484,12 +524,15 @@ impl BlockDataBuilder {
pub enum BlockDataAddError {
#[error("Blockfile key not in order")]
KeyNotInOrder,
#[error("Roaring bitmap error")]
RoaringBitmapError(#[from] Error),
}

impl ChromaError for BlockDataAddError {
fn code(&self) -> ErrorCodes {
match self {
BlockDataAddError::KeyNotInOrder => ErrorCodes::InvalidArgument,
BlockDataAddError::RoaringBitmapError(_) => ErrorCodes::Internal,
}
}
}
Expand Down
Loading

0 comments on commit 609b46e

Please sign in to comment.