Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 9, 2024
1 parent 08e08b1 commit 6aa6145
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 53 deletions.
135 changes: 82 additions & 53 deletions rust/worker/src/blockstore/arrow_blockfile/block/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,57 @@ use arrow::util::bit_util;
use parking_lot::RwLock;
use std::{collections::BTreeMap, sync::Arc};

/// A block delta tracks a source block and represents the new state of a block. Blocks are
/// immutable, so when a write is made to a block, a new block is created with the new state.
/// A block delta is a temporary representation of the new state of a block. A block delta
/// can be converted to a block data, which is then used to create a new block. A block data
/// can be converted into a block delta for new writes.
/// # Methods
/// - can_add: checks if a key value pair can be added to the block delta and still be within the
/// max block size.
/// - add: adds a key value pair to the block delta.
/// - delete: deletes a key from the block delta.
/// - get_min_key: gets the minimum key in the block delta.
/// - get_size: gets the size of the block delta.
/// - split: splits the block delta into two block deltas.
#[derive(Clone)]
pub struct BlockDelta {
pub source_block: Arc<Block>,
inner: Arc<RwLock<BlockDeltaInner>>,
}

impl BlockDelta {
/// Checks if a key value pair can be added to the block delta and still be within the
/// max block size.
pub fn can_add(&self, key: &BlockfileKey, value: &Value) -> bool {
let inner = self.inner.read();
inner.can_add(key, value)
}

/// Adds a key value pair to the block delta.
pub fn add(&self, key: BlockfileKey, value: Value) {
let mut inner = self.inner.write();
inner.add(key, value);
}

/// Deletes a key from the block delta.
pub fn delete(&self, key: BlockfileKey) {
let mut inner = self.inner.write();
inner.delete(key);
}

/// Gets the minimum key in the block delta.
pub fn get_min_key(&self) -> Option<BlockfileKey> {
let inner = self.inner.read();
let first_key = inner.new_data.keys().next();
first_key.cloned()
}

fn get_prefix_size(&self) -> usize {
let inner = self.inner.read();
inner.get_prefix_size()
}

fn get_key_size(&self) -> usize {
let inner = self.inner.read();
inner.get_key_size()
}

fn get_value_size(&self) -> usize {
let inner = self.inner.read();
inner.get_value_size()
}

fn get_value_count(&self) -> usize {
let inner = self.inner.read();
inner.get_value_count()
}

/// Gets the size of the block delta as it would be in a block. This includes
/// the size of the prefix, key, and value data and the size of the offsets
/// where applicable. The size is rounded up to the nearest 64 bytes as per
/// the arrow specification. When a block delta is converted into a block data
/// the same sizing is used to allocate the memory for the block data.
pub fn get_size(&self) -> usize {
let inner = self.inner.read();
inner.get_size(
Expand All @@ -63,11 +66,8 @@ impl BlockDelta {
)
}

fn len(&self) -> usize {
let inner = self.inner.read();
inner.new_data.len()
}

/// Splits the block delta into two block deltas. The split point is the last key
/// that pushes the block over the half size.
pub fn split(&self, provider: &ArrowBlockProvider) -> (BlockfileKey, BlockDelta) {
let new_block = provider.create_block(
self.source_block.get_key_type(),
Expand All @@ -86,6 +86,31 @@ impl BlockDelta {
},
)
}

fn get_prefix_size(&self) -> usize {
let inner = self.inner.read();
inner.get_prefix_size()
}

fn get_key_size(&self) -> usize {
let inner = self.inner.read();
inner.get_key_size()
}

fn get_value_size(&self) -> usize {
let inner = self.inner.read();
inner.get_value_size()
}

fn get_value_count(&self) -> usize {
let inner = self.inner.read();
inner.get_value_count()
}

fn len(&self) -> usize {
let inner = self.inner.read();
inner.new_data.len()
}
}

struct BlockDeltaInner {
Expand Down Expand Up @@ -117,19 +142,10 @@ impl BlockDeltaInner {

// https://docs.rs/arrow/latest/arrow/array/array/struct.GenericListArray.html
let key_total_bytes = bit_util::round_upto_multiple_of_64(key_size);
let key_offset_bytes = match key_type {
KeyType::String => bit_util::round_upto_multiple_of_64((item_count + 1) * 4),
KeyType::Float => 0,
_ => unimplemented!("Key type not implemented"),
};
let key_offset_bytes = self.offset_size_for_key_type(item_count, key_type);

let value_total_bytes = bit_util::round_upto_multiple_of_64(value_size);
let value_offset_bytes = match value_type {
ValueType::Int32Array | ValueType::String => {
bit_util::round_upto_multiple_of_64((item_count + 1) * 4)
}
_ => unimplemented!("Value type not implemented"),
};
let value_offset_bytes = self.offset_size_for_value_type(item_count, value_type);

prefix_total_bytes
+ prefix_offset_bytes
Expand Down Expand Up @@ -181,30 +197,45 @@ impl BlockDeltaInner {
}

fn can_add(&self, key: &BlockfileKey, value: &Value) -> bool {
// TODO: move this into add with an error
let additional_prefix_size = key.get_prefix_size();
let additional_key_size = key.key.get_size();
let additional_value_size = value.get_size();

let prefix_data_size = self.get_prefix_size() + additional_prefix_size;
let key_data_size = self.get_key_size() + additional_key_size;
let value_data_size = self.get_value_size() + additional_value_size;
// TODO: use the same offset matching as in get_block_size
let prefix_offset_size = (self.new_data.len() + 1) * 4;
let key_offset_size = (self.new_data.len() + 1) * 4;
let value_offset_size = (self.new_data.len() + 1) * 4;

let prefix_total_bytes = bit_util::round_upto_multiple_of_64(prefix_data_size)
+ bit_util::round_upto_multiple_of_64(prefix_offset_size);
let key_total_bytes = bit_util::round_upto_multiple_of_64(key_data_size)
+ bit_util::round_upto_multiple_of_64(key_offset_size);
let value_total_bytes = bit_util::round_upto_multiple_of_64(value_data_size)
+ bit_util::round_upto_multiple_of_64(value_offset_size);

let prefix_offset_size = bit_util::round_upto_multiple_of_64((self.new_data.len() + 1) * 4);
let key_offset_size = self.offset_size_for_key_type(self.new_data.len(), key.into());
let value_offset_size = self.offset_size_for_value_type(self.new_data.len(), value.into());

let prefix_total_bytes =
bit_util::round_upto_multiple_of_64(prefix_data_size) + prefix_offset_size;
let key_total_bytes = bit_util::round_upto_multiple_of_64(key_data_size) + key_offset_size;
let value_total_bytes =
bit_util::round_upto_multiple_of_64(value_data_size) + value_offset_size;
let total_future_size = prefix_total_bytes + key_total_bytes + value_total_bytes;

total_future_size <= MAX_BLOCK_SIZE
}

fn offset_size_for_value_type(&self, item_count: usize, value_type: ValueType) -> usize {
match value_type {
ValueType::Int32Array | ValueType::String => {
bit_util::round_upto_multiple_of_64((item_count + 1) * 4)
}
_ => unimplemented!("Value type not implemented"),
}
}

fn offset_size_for_key_type(&self, item_count: usize, key_type: KeyType) -> usize {
match key_type {
KeyType::String => bit_util::round_upto_multiple_of_64((item_count + 1) * 4),
KeyType::Float => 0,
_ => unimplemented!("Key type not implemented"),
}
}

fn split(
&mut self,
key_type: KeyType,
Expand Down Expand Up @@ -287,12 +318,10 @@ impl From<Arc<Block>> for BlockDelta {

#[cfg(test)]
mod test {
use arrow::array::{Array, Int32Array};
use rand::{random, Rng};

use crate::blockstore::types::{Key, KeyType, ValueType};

use super::*;
use crate::blockstore::types::{Key, KeyType, ValueType};
use arrow::array::Int32Array;
use rand::{random, Rng};

#[test]
fn test_sizing_int_arr_val() {
Expand Down
22 changes: 22 additions & 0 deletions rust/worker/src/blockstore/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ impl BlockfileKey {
}
}

impl From<&BlockfileKey> for KeyType {
fn from(key: &BlockfileKey) -> Self {
match key.key {
Key::String(_) => KeyType::String,
Key::Float(_) => KeyType::Float,
Key::Bool(_) => KeyType::Bool,
}
}
}

#[derive(Clone, PartialEq, PartialOrd, Debug)]
pub(crate) enum Key {
String(String),
Expand Down Expand Up @@ -195,6 +205,18 @@ impl Value {
}
}

impl From<&Value> for ValueType {
fn from(value: &Value) -> Self {
match value {
Value::Int32ArrayValue(_) => ValueType::Int32Array,
Value::PositionalPostingListValue(_) => ValueType::PositionalPostingList,
Value::RoaringBitmapValue(_) => ValueType::RoaringBitmap,
Value::StringValue(_) => ValueType::String,
Value::Int32Value(_) => ValueType::Int32,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum ValueType {
Int32Array,
Expand Down

0 comments on commit 6aa6145

Please sign in to comment.