Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add Arrow-backed blockfile #1846

Merged
merged 7 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions rust/worker/src/blockstore/arrow_blockfile/block/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl BlockDeltaInner {
self.new_data.iter().fold(0, |acc, (_, value)| match value {
Value::Int32ArrayValue(arr) => acc + arr.len(),
Value::StringValue(s) => acc + s.len(),
Value::RoaringBitmapValue(bitmap) => acc + bitmap.serialized_size(),
_ => unimplemented!("Value type not implemented"),
})
}
Expand All @@ -225,12 +226,16 @@ impl BlockDeltaInner {
bit_util::round_upto_multiple_of_64(value_data_size) + value_offset_size;
let total_future_size = prefix_total_bytes + key_total_bytes + value_total_bytes;

if total_future_size > MAX_BLOCK_SIZE {
return false;
}

total_future_size <= MAX_BLOCK_SIZE
}

fn offset_size_for_value_type(&self, item_count: usize, value_type: ValueType) -> usize {
match value_type {
ValueType::Int32Array | ValueType::String => {
ValueType::Int32Array | ValueType::String | ValueType::RoaringBitmap => {
bit_util::round_upto_multiple_of_64((item_count + 1) * 4)
}
_ => unimplemented!("Value type not implemented"),
Expand Down Expand Up @@ -262,9 +267,11 @@ impl BlockDeltaInner {
let mut running_key_size = 0;
let mut running_value_size = 0;
let mut running_count = 0;
let mut split_key = None;

// The split key will be the last key that pushes the block over the half size. Not the first key that pushes it over
for (key, value) in self.new_data.iter() {
let mut split_key = None;
let mut iter = self.new_data.iter();
while let Some((key, value)) = iter.next() {
running_prefix_size += key.get_prefix_size();
running_key_size += key.key.get_size();
running_value_size += value.get_size();
Expand All @@ -277,10 +284,14 @@ impl BlockDeltaInner {
key_type,
value_type,
);
if half_size < current_size {
if current_size > half_size {
let next = iter.next();
match next {
Some((next_key, _)) => split_key = Some(next_key.clone()),
None => split_key = Some(key.clone()),
}
break;
}
split_key = Some(key.clone());
}

match &split_key {
Expand Down Expand Up @@ -399,4 +410,26 @@ mod test {
let block_data = BlockData::try_from(&delta).unwrap();
assert_eq!(size, block_data.get_size());
}

#[test]
fn test_sizing_roaring_bitmap_val() {
let block_provider = ArrowBlockProvider::new();
let block = block_provider.create_block(KeyType::String, ValueType::RoaringBitmap);
let delta = BlockDelta::from(block.clone());

let n = 2000;
for i in 0..n {
let key = BlockfileKey::new("key".to_string(), Key::String(format!("{:04}", i)));
let value = Value::RoaringBitmapValue(roaring::RoaringBitmap::from_iter(
(0..i).map(|x| x as u32),
));
delta.add(key, value);
}

let size = delta.get_size();
let block_data = BlockData::try_from(&delta).unwrap();
assert_eq!(size, block_data.get_size());

let (split_key, delta) = delta.split(&block_provider);
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/blockstore/arrow_blockfile/block/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod delta;
mod iterator;
mod types;

pub(in crate::blockstore::arrow_blockfile) mod delta;
// Re-export types at the arrow_blockfile module level
pub(in crate::blockstore::arrow_blockfile) use types::*;
45 changes: 44 additions & 1 deletion rust/worker/src/blockstore/arrow_blockfile/block/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::blockstore::types::{BlockfileKey, Key, KeyType, Value, ValueType};
use crate::errors::{ChromaError, ErrorCodes};
use arrow::array::{BooleanArray, BooleanBuilder, Float32Array, Float32Builder};
use arrow::array::{
BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, Float32Array, Float32Builder,
GenericByteBuilder,
};
use arrow::{
array::{Array, Int32Array, Int32Builder, ListArray, ListBuilder, StringArray, StringBuilder},
datatypes::{DataType, Field},
record_batch::RecordBatch,
};
use parking_lot::RwLock;
use std::io::Error;
use std::sync::Arc;
use thiserror::Error;
use uuid::Uuid;
Expand Down Expand Up @@ -147,6 +151,21 @@ impl Block {
.to_string(),
))
}
ValueType::RoaringBitmap => {
let bytes = value
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(i);
let bitmap = roaring::RoaringBitmap::deserialize_from(bytes);
match bitmap {
Ok(bitmap) => {
return Some(Value::RoaringBitmapValue(bitmap))
}
// TODO: log error
Err(_) => return None,
}
}
// TODO: Add support for other types
_ => unimplemented!(),
}
Expand Down Expand Up @@ -271,6 +290,7 @@ enum KeyBuilder {
enum ValueBuilder {
Int32ArrayValueBuilder(ListBuilder<Int32Builder>),
StringValueBuilder(StringBuilder),
RoaringBitmapBuilder(BinaryBuilder),
}

/// BlockDataBuilder is used to build a block. It is used to add data to a block and then build the BlockData once all data has been added.
Expand Down Expand Up @@ -359,6 +379,9 @@ impl BlockDataBuilder {
options.item_count,
options.total_value_capacity,
)),
ValueType::RoaringBitmap => ValueBuilder::RoaringBitmapBuilder(
BinaryBuilder::with_capacity(options.item_count, options.total_value_capacity),
),
// TODO: Implement the other value types
_ => unimplemented!(),
};
Expand Down Expand Up @@ -420,6 +443,18 @@ impl BlockDataBuilder {
}
_ => unreachable!("Invalid value type for block"),
},
ValueBuilder::RoaringBitmapBuilder(ref mut builder) => match value {
Value::RoaringBitmapValue(bitmap) => {
let mut bytes = Vec::with_capacity(bitmap.serialized_size());
match bitmap.serialize_into(&mut bytes) {
Ok(_) => builder.append_value(&bytes),
Err(e) => {
return Err(Box::new(BlockDataAddError::RoaringBitmapError(e)));
}
}
}
_ => unreachable!("Invalid value type for block"),
},
}

Ok(())
Expand Down Expand Up @@ -464,6 +499,11 @@ impl BlockDataBuilder {
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
ValueBuilder::RoaringBitmapBuilder(ref mut builder) => {
value_field = Field::new("value", DataType::Binary, true);
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
};

let schema = Arc::new(arrow::datatypes::Schema::new(vec![
Expand All @@ -484,12 +524,15 @@ impl BlockDataBuilder {
pub enum BlockDataAddError {
#[error("Blockfile key not in order")]
KeyNotInOrder,
#[error("Roaring bitmap error")]
RoaringBitmapError(#[from] Error),
}

impl ChromaError for BlockDataAddError {
fn code(&self) -> ErrorCodes {
match self {
BlockDataAddError::KeyNotInOrder => ErrorCodes::InvalidArgument,
BlockDataAddError::RoaringBitmapError(_) => ErrorCodes::Internal,
}
}
}
Expand Down
Loading
Loading