diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/blockfile.rs b/rust/worker/src/blockstore/arrow_blockfile/block/blockfile.rs deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs b/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs index dee9070f57d8..eda61cc0c374 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs @@ -1,3 +1,4 @@ +use super::{Block, BlockBuilderOptions, BlockData, BlockDataBuilder}; use crate::blockstore::{ arrow_blockfile::{blockfile::MAX_BLOCK_SIZE, provider::ArrowBlockProvider}, types::{BlockfileKey, KeyType, Value, ValueType}, @@ -6,8 +7,6 @@ use arrow::util::bit_util; use parking_lot::RwLock; use std::{collections::BTreeMap, sync::Arc}; -use super::{Block, BlockBuilderOptions, BlockData, BlockDataBuilder}; - #[derive(Clone)] pub struct BlockDelta { pub source_block: Arc, @@ -247,8 +246,10 @@ impl BlockDeltaInner { } } -impl From<&BlockDelta> for BlockData { - fn from(delta: &BlockDelta) -> Self { +impl TryFrom<&BlockDelta> for BlockData { + type Error = super::BlockDataBuildError; + + fn try_from(delta: &BlockDelta) -> Result { let mut builder = BlockDataBuilder::new( delta.source_block.get_key_type(), delta.source_block.get_value_type(), @@ -311,7 +312,7 @@ mod test { } let size = delta.get_size(); - let block_data = BlockData::from(&delta); + let block_data = BlockData::try_from(&delta).unwrap(); assert_eq!(size, block_data.get_size()); } @@ -328,7 +329,7 @@ mod test { delta.add(key, value); } let size = delta.get_size(); - let block_data = BlockData::from(&delta); + let block_data = BlockData::try_from(&delta).unwrap(); assert_eq!(size, block_data.get_size()); } @@ -346,7 +347,7 @@ mod test { } let size = delta.get_size(); - let block_data = BlockData::from(&delta); + let block_data = BlockData::try_from(&delta).unwrap(); assert_eq!(size, block_data.get_size()); } } diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs b/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs index 32ffeb4466cd..4ac656dc86b7 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs @@ -1,7 +1,5 @@ -mod blockfile; mod delta; mod iterator; -mod provider; mod types; // Re-export types at the arrow_blockfile module level diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/provider.rs b/rust/worker/src/blockstore/arrow_blockfile/block/provider.rs deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/types.rs b/rust/worker/src/blockstore/arrow_blockfile/block/types.rs index 8700666ee0c2..da6379735b90 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/block/types.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/block/types.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use thiserror::Error; use uuid::Uuid; +use super::delta::BlockDelta; use super::iterator::BlockIterator; /// BlockState represents the state of a block in the blockstore. Conceptually, a block is immutable once the broarder system @@ -58,12 +59,15 @@ pub struct Block { pub enum BlockError { #[error("Invalid state transition")] InvalidStateTransition, + #[error("Block data error")] + BlockDataError(#[from] BlockDataBuildError), } impl ChromaError for BlockError { fn code(&self) -> ErrorCodes { match self { BlockError::InvalidStateTransition => ErrorCodes::Internal, + BlockError::BlockDataError(e) => e.code(), } } } @@ -202,6 +206,29 @@ impl Block { } } + pub fn apply_delta(&self, delta: &BlockDelta) -> Result<(), Box> { + let data = match BlockData::try_from(delta) { + Ok(data) => data, + Err(e) => return Err(Box::new(BlockError::BlockDataError(e))), + }; + let mut inner = self.inner.write(); + match inner.state { + BlockState::Uninitialized => { + inner.data = Some(data); + inner.state = BlockState::Initialized; + Ok(()) + } + BlockState::Initialized => { + inner.data = Some(data); + inner.state = BlockState::Initialized; + Ok(()) + } + BlockState::Commited | BlockState::Registered => { + Err(Box::new(BlockError::InvalidStateTransition)) + } + } + } + pub(super) fn iter(&self) -> BlockIterator { BlockIterator::new( self.clone(), diff --git a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs new file mode 100644 index 000000000000..507455e2e2c0 --- /dev/null +++ b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs @@ -0,0 +1 @@ +pub(super) const MAX_BLOCK_SIZE: usize = 16384; diff --git a/rust/worker/src/blockstore/arrow_blockfile/mod.rs b/rust/worker/src/blockstore/arrow_blockfile/mod.rs index fc9210db1bab..cbda73423547 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/mod.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/mod.rs @@ -1 +1,3 @@ mod block; +mod blockfile; +mod provider; diff --git a/rust/worker/src/blockstore/arrow_blockfile/provider.rs b/rust/worker/src/blockstore/arrow_blockfile/provider.rs new file mode 100644 index 000000000000..59772bb3eaff --- /dev/null +++ b/rust/worker/src/blockstore/arrow_blockfile/provider.rs @@ -0,0 +1,37 @@ +use super::block::Block; +use crate::blockstore::{KeyType, ValueType}; +use parking_lot::RwLock; +use std::{collections::HashMap, sync::Arc}; +use uuid::Uuid; + +struct ArrowBlockProviderInner { + blocks: HashMap>, +} + +#[derive(Clone)] +pub(super) struct ArrowBlockProvider { + inner: Arc>, +} + +impl ArrowBlockProvider { + pub(super) fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(ArrowBlockProviderInner { + blocks: HashMap::new(), + })), + } + } + + pub(super) fn create_block(&self, key_type: KeyType, value_type: ValueType) -> Arc { + let block = Arc::new(Block::new(Uuid::new_v4(), key_type, value_type)); + self.inner + .write() + .blocks + .insert(block.get_id(), block.clone()); + block + } + + pub(super) fn get_block(&self, id: &Uuid) -> Option> { + self.inner.read().blocks.get(id).cloned() + } +}