Skip to content

Commit

Permalink
[ENH] Add block data builder
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 6, 2024
1 parent a7db1d9 commit a5eb5d7
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 16 deletions.
114 changes: 98 additions & 16 deletions rust/worker/src/blockstore/arrow_blockfile/block/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,21 @@ enum ValueBuilder {
StringValueBuilder(StringBuilder),
}

/// BlockDataBuilder is used to build a block. It is used to add data to a block and then build the BlockData once all data has been added.
/// It is only used internally to an arrow_blockfile.
pub(super) struct BlockDataBuilder {
prefix_builder: StringBuilder,
key_builder: KeyBuilder,
value_builder: ValueBuilder,
last_key: Option<BlockfileKey>,
}

/// ## Options for the BlockDataBuilder
/// - item_count: The number of items in the block
/// - prefix_data_capacity: The required capacity for the prefix data. This will be rounded to the nearest 64 byte alignment by arrow.
/// - key_data_capacity: The required capacity for the key data. This will be rounded to the nearest 64 byte alignment by arrow.
/// - total_value_count: The total number of values in the block
/// - total_value_capacity: The required capacity for the value data. This will be rounded to the nearest 64 byte alignment by arrow.
pub(super) struct BlockBuilderOptions {
pub(super) item_count: usize,
pub(super) prefix_data_capacity: usize,
Expand Down Expand Up @@ -310,17 +319,32 @@ impl BlockDataBuilder {
options.item_count,
options.total_value_capacity,
)),
// TODO: Implement the other value types
_ => unimplemented!(),
};
Self {
prefix_builder,
key_builder,
value_builder,
last_key: None,
}
}

pub(super) fn add(&mut self, key: BlockfileKey, value: Value) {
// TODO: you must add in sorted order, error if not
/// Adds a key, value pair to the block. The key must be greater than the last key added to the block otherwise an error is returned.
pub(super) fn add(
&mut self,
key: BlockfileKey,
value: Value,
) -> Result<(), Box<BlockDataAddError>> {
match &self.last_key {
Some(last_key) => {
if key < *last_key {
return Err(Box::new(BlockDataAddError::KeyNotInOrder));
}
}
None => {}
}
self.last_key = Some(key.clone());
self.prefix_builder.append_value(key.prefix);
match self.key_builder {
KeyBuilder::StringBuilder(ref mut builder) => match key.key {
Expand Down Expand Up @@ -348,18 +372,20 @@ impl BlockDataBuilder {
Value::Int32ArrayValue(array) => {
builder.append_value(&array);
}
_ => unimplemented!(),
_ => unreachable!("Invalid value type for block"),
},
ValueBuilder::StringValueBuilder(ref mut builder) => match value {
Value::StringValue(string) => {
builder.append_value(string);
}
_ => unimplemented!(),
_ => unreachable!("Invalid value type for block"),
},
}

Ok(())
}

pub(super) fn build(&mut self) -> BlockData {
pub(super) fn build(&mut self) -> Result<BlockData, BlockDataBuildError> {
let prefix = self.prefix_builder.finish();
let prefix_field = Field::new("prefix", DataType::Utf8, true);
// TODO: figure out how to get rid of nullable, the builders turn it on by default but we don't want it
Expand Down Expand Up @@ -407,12 +433,38 @@ impl BlockDataBuilder {
]));
let record_batch =
RecordBatch::try_new(schema, vec![Arc::new(prefix), Arc::new(key), value]);
BlockData::new(record_batch.unwrap())
match record_batch {
Ok(record_batch) => Ok(BlockData::new(record_batch)),
Err(e) => Err(BlockDataBuildError::ArrowError(e)),
}
}
}

#[derive(Error, Debug)]
pub enum BlockDataAddError {
#[error("Blockfile key not in order")]
KeyNotInOrder,
}

impl ChromaError for BlockDataAddError {
fn code(&self) -> ErrorCodes {
match self {
BlockDataAddError::KeyNotInOrder => ErrorCodes::InvalidArgument,
}
}
}

#[derive(Error, Debug)]
pub enum BlockDataBuildError {
#[error("Arrow error")]
ArrowError(#[from] arrow::error::ArrowError),
}

pub(super) fn get_size(&self) -> usize {
let size = 0;
size
impl ChromaError for BlockDataBuildError {
fn code(&self) -> ErrorCodes {
match self {
BlockDataBuildError::ArrowError(_) => ErrorCodes::Internal,
}
}
}

Expand All @@ -423,13 +475,13 @@ mod test {
use arrow::array::Int32Array;

#[test]
fn test_block_builder() {
fn test_block_builder_can_add() {
let num_entries = 1000;

let mut keys = Vec::new();
let mut key_bytes = 0;
for i in 0..num_entries {
keys.push(Key::String(i.to_string()));
keys.push(Key::String(format!("{:04}", i)));
key_bytes += i.to_string().len();
}

Expand All @@ -442,16 +494,46 @@ mod test {
num_entries,
prefix_bytes,
key_bytes,
num_entries * 2, // 2 int32s per entry
num_entries, // 2 int32s per entry
num_entries * 2 * 4, // 2 int32s per entry
)),
);

for i in 0..num_entries {
block_builder.add(
BlockfileKey::new(prefix.clone(), keys[i].clone()),
Value::Int32ArrayValue(Int32Array::from(vec![i as i32, (i + 1) as i32])),
);
block_builder
.add(
BlockfileKey::new(prefix.clone(), keys[i].clone()),
Value::Int32ArrayValue(Int32Array::from(vec![i as i32, (i + 1) as i32])),
)
.unwrap();
}
}

#[test]
fn test_out_of_order_key_fails() {
let mut block_builder = BlockDataBuilder::new(
KeyType::String,
ValueType::Int32Array,
Some(BlockBuilderOptions::default()),
);

block_builder
.add(
BlockfileKey::new("key".to_string(), Key::String("b".to_string())),
Value::Int32ArrayValue(Int32Array::from(vec![1, 2])),
)
.unwrap();

let result = block_builder.add(
BlockfileKey::new("key".to_string(), Key::String("a".to_string())),
Value::Int32ArrayValue(Int32Array::from(vec![1, 2])),
);

match result {
Ok(_) => panic!("Expected error"),
Err(e) => {
assert_eq!(e.code(), ErrorCodes::InvalidArgument);
}
}
}
}
1 change: 1 addition & 0 deletions rust/worker/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::error::Error;

#[derive(PartialEq, Debug)]
pub(crate) enum ErrorCodes {
// OK is returned on success, we use "Success" since Ok is a keyword in Rust.
Success = 0,
Expand Down

0 comments on commit a5eb5d7

Please sign in to comment.