Skip to content

Commit

Permalink
[ENH] add block builder
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 6, 2024
1 parent f1159f4 commit d5ea675
Showing 1 changed file with 233 additions and 0 deletions.
233 changes: 233 additions & 0 deletions rust/worker/src/blockstore/arrow_blockfile/block/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,236 @@ impl BlockData {
total_size
}
}

// ============== BlockDataBuilder ==============

enum KeyBuilder {
StringBuilder(StringBuilder),
FloatBuilder(Float32Builder),
BoolBuilder(BooleanBuilder),
}

enum ValueBuilder {
Int32ArrayValueBuilder(ListBuilder<Int32Builder>),
StringValueBuilder(StringBuilder),
}

pub(super) struct BlockDataBuilder {
prefix_builder: StringBuilder,
key_builder: KeyBuilder,
value_builder: ValueBuilder,
}

pub(super) struct BlockBuilderOptions {
pub(super) item_count: usize,
pub(super) prefix_data_capacity: usize,
pub(super) key_data_capacity: usize,
pub(super) total_value_count: usize,
pub(super) total_value_capacity: usize,
}

impl BlockBuilderOptions {
pub(super) fn new(
item_count: usize,
prefix_data_capacity: usize,
key_data_capacity: usize,
total_value_count: usize,
total_value_capacity: usize,
) -> Self {
Self {
item_count,
prefix_data_capacity,
key_data_capacity,
total_value_count,
total_value_capacity,
}
}

pub(super) fn default() -> Self {
Self {
item_count: 1024,
prefix_data_capacity: 1024,
key_data_capacity: 1024,
total_value_count: 1024,
total_value_capacity: 1024,
}
}
}

impl BlockDataBuilder {
pub(super) fn new(
key_type: KeyType,
value_type: ValueType,
options: Option<BlockBuilderOptions>,
) -> Self {
let options = options.unwrap_or(BlockBuilderOptions::default());
let prefix_builder =
StringBuilder::with_capacity(options.item_count, options.prefix_data_capacity);
let key_builder = match key_type {
KeyType::String => KeyBuilder::StringBuilder(StringBuilder::with_capacity(
options.item_count,
options.key_data_capacity,
)),
KeyType::Float => {
KeyBuilder::FloatBuilder(Float32Builder::with_capacity(options.item_count))
}
KeyType::Bool => {
KeyBuilder::BoolBuilder(BooleanBuilder::with_capacity(options.item_count))
}
};
let value_builder = match value_type {
ValueType::Int32Array => {
ValueBuilder::Int32ArrayValueBuilder(ListBuilder::with_capacity(
Int32Builder::with_capacity(options.total_value_count),
options.item_count,
))
}
ValueType::String => ValueBuilder::StringValueBuilder(StringBuilder::with_capacity(
options.item_count,
options.total_value_capacity,
)),
_ => unimplemented!(),
};
Self {
prefix_builder,
key_builder,
value_builder,
}
}

pub(super) fn add(&mut self, key: BlockfileKey, value: Value) {
// TODO: you must add in sorted order, error if not
self.prefix_builder.append_value(key.prefix);
match self.key_builder {
KeyBuilder::StringBuilder(ref mut builder) => match key.key {
Key::String(key) => {
builder.append_value(key);
}
_ => unreachable!("Invalid key type for block"),
},
KeyBuilder::FloatBuilder(ref mut builder) => match key.key {
Key::Float(key) => {
builder.append_value(key);
}
_ => unreachable!("Invalid key type for block"),
},
KeyBuilder::BoolBuilder(ref mut builder) => match key.key {
Key::Bool(key) => {
builder.append_value(key);
}
_ => unreachable!("Invalid key type for block"),
},
}

match self.value_builder {
ValueBuilder::Int32ArrayValueBuilder(ref mut builder) => match value {
Value::Int32ArrayValue(array) => {
builder.append_value(&array);
}
_ => unimplemented!(),
},
ValueBuilder::StringValueBuilder(ref mut builder) => match value {
Value::StringValue(string) => {
builder.append_value(string);
}
_ => unimplemented!(),
},
}
}

pub(super) fn build(&mut self) -> BlockData {
let prefix = self.prefix_builder.finish();
let prefix_field = Field::new("prefix", DataType::Utf8, true);
// TODO: figure out how to get rid of nullable, the builders turn it on by default but we don't want it
let key_field;
let key = match self.key_builder {
KeyBuilder::StringBuilder(ref mut builder) => {
key_field = Field::new("key", DataType::Utf8, true);
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
KeyBuilder::FloatBuilder(ref mut builder) => {
key_field = Field::new("key", DataType::Float32, true);
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
KeyBuilder::BoolBuilder(ref mut builder) => {
key_field = Field::new("key", DataType::Boolean, true);
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
};

let value_field;
let value = match self.value_builder {
ValueBuilder::Int32ArrayValueBuilder(ref mut builder) => {
value_field = Field::new(
"value",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
true,
);
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
ValueBuilder::StringValueBuilder(ref mut builder) => {
value_field = Field::new("value", DataType::Utf8, true);
let arr = builder.finish();
(&arr as &dyn Array).slice(0, arr.len())
}
};

let schema = Arc::new(arrow::datatypes::Schema::new(vec![
prefix_field,
key_field,
value_field,
]));
let record_batch =
RecordBatch::try_new(schema, vec![Arc::new(prefix), Arc::new(key), value]);
BlockData::new(record_batch.unwrap())
}

pub(super) fn get_size(&self) -> usize {
let size = 0;
size
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::blockstore::types::Key;
use arrow::array::Int32Array;

#[test]
fn test_block_builder() {
let num_entries = 1000;

let mut keys = Vec::new();
let mut key_bytes = 0;
for i in 0..num_entries {
keys.push(Key::String(i.to_string()));
key_bytes += i.to_string().len();
}

let prefix = "key".to_string();
let prefix_bytes = prefix.len() * num_entries;
let mut block_builder = BlockDataBuilder::new(
KeyType::String,
ValueType::Int32Array,
Some(BlockBuilderOptions::new(
num_entries,
prefix_bytes,
key_bytes,
num_entries * 2, // 2 int32s per entry
num_entries * 2 * 4, // 2 int32s per entry
)),
);

for i in 0..num_entries {
block_builder.add(
BlockfileKey::new(prefix.clone(), keys[i].clone()),
Value::Int32ArrayValue(Int32Array::from(vec![i as i32, (i + 1) as i32])),
);
}
}
}

0 comments on commit d5ea675

Please sign in to comment.