Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add support for delete in blockfile #1957

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions rust/worker/src/blockstore/arrow_blockfile/block/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{Block, BlockBuilderOptions, BlockData, BlockDataBuilder};
use crate::blockstore::{
arrow_blockfile::{blockfile::MAX_BLOCK_SIZE, provider::ArrowBlockProvider},
types::{BlockfileKey, KeyType, Value, ValueType},
BlockfileError,
};
use arrow::util::bit_util;
use parking_lot::RwLock;
Expand Down Expand Up @@ -41,9 +42,9 @@ impl BlockDelta {
}

/// Deletes a key from the block delta.
pub fn delete(&self, key: BlockfileKey) {
pub fn delete(&self, key: BlockfileKey) -> Result<(), Box<BlockfileError>> {
let mut inner = self.inner.write();
inner.delete(key);
inner.delete(key)
}

/// Gets the minimum key in the block delta.
Expand Down Expand Up @@ -131,10 +132,12 @@ impl BlockDeltaInner {
self.new_data.insert(key, value);
}

fn delete(&mut self, key: BlockfileKey) {
fn delete(&mut self, key: BlockfileKey) -> Result<(), Box<BlockfileError>> {
if self.new_data.contains_key(&key) {
self.new_data.remove(&key);
return Ok(());
}
Err(Box::new(BlockfileError::NotFoundError))
}

fn get_block_size(
Expand Down
6 changes: 6 additions & 0 deletions rust/worker/src/blockstore/arrow_blockfile/block/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ use arrow::array::{Array, BooleanArray, Int32Array, ListArray, StringArray, UInt
pub(super) struct BlockIterator {
block: Block,
index: usize,
length: usize,
key_type: KeyType,
value_type: ValueType,
}

impl BlockIterator {
pub fn new(block: Block, key_type: KeyType, value_type: ValueType) -> Self {
let len = block.len();
Self {
block,
index: 0,
length: len,
key_type,
value_type,
}
Expand All @@ -31,6 +34,9 @@ impl Iterator for BlockIterator {
if data.is_none() {
return None;
}
if self.index >= self.length {
return None;
}

// Arrow requires us to downcast the array to the specific type we want to work with.
// This is a bit awkward, but it's the way Arrow works to allow for dynamic typing.
Expand Down
137 changes: 114 additions & 23 deletions rust/worker/src/blockstore/arrow_blockfile/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::sparse_index::SparseIndex;
use crate::blockstore::arrow_blockfile::block::delta::BlockDelta;
use crate::blockstore::BlockfileError;
use crate::errors::ChromaError;
use crate::errors::IntoResult;
use parking_lot::Mutex;
use std::sync::Arc;
use thiserror::Error;
Expand Down Expand Up @@ -94,6 +95,48 @@ impl Blockfile for ArrowBlockfile {
}
}

fn delete(&mut self, key: BlockfileKey) -> Result<(), Box<dyn ChromaError>> {
if !self.in_transaction() {
return Err(Box::new(BlockfileError::TransactionNotInProgress));
}

self.validate_key(&key)?;

let transaction_state = match &self.transaction_state {
None => return Err(Box::new(BlockfileError::TransactionNotInProgress)),
Some(transaction_state) => transaction_state,
};

// Note: The code to get the target block as well as get or create the delta is duplicated
// in the set and delete methods. This is due to the lock management making it clunky
// to abstract this into a separate method. This likely should be refactored
// for cleaner reuse, but for now its ok to repeat ourselves.

// Get the target block id for the key
let transaction_sparse_index = transaction_state.sparse_index.lock();
let target_block_id = match *transaction_sparse_index {
None => self.sparse_index.lock().get_target_block_id(&key),
Some(ref index) => index.lock().get_target_block_id(&key),
};

// See if a delta for the target block already exists, if not create a new one and add it to the transaction state
// Creating a delta loads the block entirely into memory
let delta = match transaction_state.get_delta_for_block(&target_block_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with the rollback mechanism here. What happens if we need to rollback the delete?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it discards the whole in-memory state

None => {
let target_block = match self.block_provider.get_block(&target_block_id) {
None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)),
Some(block) => block,
};
let delta = BlockDelta::from(target_block);
transaction_state.add_delta(delta.clone());
delta
}
Some(delta) => delta,
};

delta.delete(key).into_result()
}

fn get_by_prefix(
&self,
prefix: String,
Expand Down Expand Up @@ -144,29 +187,7 @@ impl Blockfile for ArrowBlockfile {
return Err(Box::new(BlockfileError::TransactionNotInProgress));
}

// Validate key type
match key.key {
Key::String(_) => {
if self.key_type != KeyType::String {
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
Key::Float(_) => {
if self.key_type != KeyType::Float {
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
Key::Bool(_) => {
if self.key_type != KeyType::Bool {
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
Key::Uint(_) => {
if self.key_type != KeyType::Uint {
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
}
self.validate_key(&key)?;

// Validate value type
match value {
Expand Down Expand Up @@ -388,6 +409,32 @@ impl ArrowBlockfile {
fn in_transaction(&self) -> bool {
self.transaction_state.is_some()
}

fn validate_key(&self, key: &BlockfileKey) -> Result<(), Box<dyn ChromaError>> {
match key.key {
Key::String(_) => {
if self.key_type != KeyType::String {
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
Key::Float(_) => {
if self.key_type != KeyType::Float {
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
Key::Bool(_) => {
if self.key_type != KeyType::Bool {
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
Key::Uint(_) => {
if self.key_type != KeyType::Uint {
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
}
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -605,4 +652,48 @@ mod tests {
}
}
}

#[test]
fn test_delete() {
let block_provider = ArrowBlockProvider::new();
let mut blockfile =
ArrowBlockfile::new(KeyType::String, ValueType::Int32Array, block_provider);

blockfile.begin_transaction().unwrap();
let key1 = BlockfileKey::new("key".to_string(), Key::String("zzzz".to_string()));
blockfile
.set(
key1.clone(),
Value::Int32ArrayValue(Int32Array::from(vec![1, 2, 3])),
)
.unwrap();
let key2 = BlockfileKey::new("key".to_string(), Key::String("aaaa".to_string()));
blockfile
.set(
key2.clone(),
Value::Int32ArrayValue(Int32Array::from(vec![4, 5, 6])),
)
.unwrap();
blockfile.commit_transaction().unwrap();

blockfile.begin_transaction().unwrap();
blockfile.delete(key1.clone()).unwrap();
blockfile.commit_transaction().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test rollback here as well?

Copy link
Collaborator Author

@HammadB HammadB Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollback is unimplemented in this PR. i have a stale stacked PR with proptests


let res = blockfile.get(key1);
match res {
Err(err) => {
assert_eq!(err.code(), crate::errors::ErrorCodes::NotFound);
}
_ => panic!("Expected not found error"),
}

let res = blockfile.get(key2);
match res {
Ok(_) => {}
Err(err) => {
panic!("Expected key2 to be found, got error: {:?}", err);
}
}
}
}
15 changes: 12 additions & 3 deletions rust/worker/src/blockstore/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ pub(crate) enum BlockfileError {
impl ChromaError for BlockfileError {
fn code(&self) -> ErrorCodes {
match self {
BlockfileError::NotFoundError
| BlockfileError::InvalidKeyType
| BlockfileError::InvalidValueType => ErrorCodes::InvalidArgument,
BlockfileError::NotFoundError => ErrorCodes::NotFound,
BlockfileError::InvalidKeyType | BlockfileError::InvalidValueType => {
ErrorCodes::InvalidArgument
}
BlockfileError::TransactionInProgress | BlockfileError::TransactionNotInProgress => {
ErrorCodes::FailedPrecondition
}
Expand Down Expand Up @@ -266,6 +267,7 @@ pub(crate) trait Blockfile: BlockfileClone {
) -> Result<Vec<(BlockfileKey, Value)>, Box<dyn ChromaError>>;

fn set(&mut self, key: BlockfileKey, value: Value) -> Result<(), Box<dyn ChromaError>>;
fn delete(&mut self, key: BlockfileKey) -> Result<(), Box<dyn ChromaError>>;

fn get_gt(
&self,
Expand Down Expand Up @@ -350,6 +352,13 @@ impl Blockfile for HashMapBlockfile {
Ok(())
}

fn delete(&mut self, key: BlockfileKey) -> Result<(), Box<dyn ChromaError>> {
match self.map.write().remove(&key) {
Some(_) => Ok(()),
None => Err(Box::new(BlockfileError::NotFoundError)),
}
}

fn get_gt(
&self,
prefix: String,
Expand Down
17 changes: 17 additions & 0 deletions rust/worker/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// gRPC spec. https://grpc.github.io/grpc/core/md_doc_statuscodes.html
// Custom errors can use these codes in order to allow for generic handling
use std::error::Error;
use std::result::Result;

#[derive(PartialEq, Debug)]
pub(crate) enum ErrorCodes {
Expand Down Expand Up @@ -44,3 +45,19 @@ pub(crate) enum ErrorCodes {
pub(crate) trait ChromaError: Error + Send {
fn code(&self) -> ErrorCodes;
}

pub(crate) trait IntoResult<T> {
fn into_result(self) -> Result<T, Box<dyn ChromaError>>;
}

/// Converts a Result<R, E> into a Result<R, Box<dyn ChromaError>>.
impl<R, E: ChromaError + 'static> IntoResult<R> for Result<R, Box<E>> {
fn into_result(self) -> Result<R, Box<dyn ChromaError>> {
if self.is_err() {
let as_chroma_error: Box<dyn ChromaError> = self.err().unwrap();
return Err(as_chroma_error);
} else {
return Ok(self.unwrap());
}
}
}
Loading