Skip to content

Commit

Permalink
[ENH] Add arrow blockfile provicer
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 13, 2024
1 parent 69660b8 commit d5c20c5
Showing 1 changed file with 54 additions and 2 deletions.
56 changes: 54 additions & 2 deletions rust/worker/src/blockstore/arrow_blockfile/provider.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,61 @@
use super::block::Block;
use crate::blockstore::{KeyType, ValueType};
use super::{block::Block, blockfile::ArrowBlockfile};
use crate::blockstore::{
provider::{BlockfileProvider, CreateError, OpenError},
Blockfile, KeyType, ValueType,
};
use parking_lot::RwLock;
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;

/// A BlockFileProvider that creates ArrowBlockfiles (Arrow-backed blockfiles used for production).
/// For now, it keeps a simple local cache of blockfiles.
pub(super) struct ArrowBlockfileProvider {
block_provider: ArrowBlockProvider,
files: HashMap<String, Box<dyn Blockfile>>,
}

impl BlockfileProvider for ArrowBlockfileProvider {
fn new() -> Self {
Self {
block_provider: ArrowBlockProvider::new(),
files: HashMap::new(),
}
}

fn open(&self, path: &str) -> Result<Box<dyn Blockfile>, Box<OpenError>> {
match self.files.get(path) {
Some(file) => Ok(file.clone()),
None => Err(Box::new(OpenError::NotFound)),
}
}

fn create(
&mut self,
path: &str,
key_type: KeyType,
value_type: ValueType,
) -> Result<Box<dyn Blockfile>, Box<CreateError>> {
match self.files.get(path) {
Some(_) => Err(Box::new(CreateError::AlreadyExists)),
None => {
let blockfile = Box::new(ArrowBlockfile::new(
key_type,
value_type,
self.block_provider.clone(),
));
self.files.insert(path.to_string(), blockfile);
Ok(self.files.get(path).unwrap().clone())
}
}
}
}

/// A simple local cache of Arrow-backed blocks, the blockfile provider passes this
/// to the ArrowBlockfile when it creates a new blockfile. So that the blockfile can manage and access blocks
/// # Note
/// The implementation is currently very simple and not intended for robust production use. We should
/// introduce a more sophisticated cache that can handle tiered eviction and other features. This interface
/// is a placeholder for that.
struct ArrowBlockProviderInner {
blocks: HashMap<Uuid, Arc<Block>>,
}
Expand Down

0 comments on commit d5c20c5

Please sign in to comment.