Skip to content

Commit

Permalink
[CLN] Make LocalStorage use sync filesystem APIs (#2531)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - This PR makes LocalStorage use sync filesystem APIs
 - New functionality
	 - ...

## Test plan
*How are these changes tested?*

- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Ishiihara authored Jul 17, 2024
1 parent ede2750 commit cdec540
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 110 deletions.
5 changes: 2 additions & 3 deletions rust/worker/src/blockstore/arrow/concurrency_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod tests {
cache::Cache,
config::{CacheConfig, UnboundedCacheConfig},
},
storage::{sync_local::SyncLocalStorage, Storage},
storage::{local::LocalStorage, Storage},
};
use rand::Rng;
use shuttle::{future, thread};
Expand All @@ -16,8 +16,7 @@ mod tests {
shuttle::check_random(
|| {
let tmp_dir = tempfile::tempdir().unwrap();
let storage =
Storage::SyncLocal(SyncLocalStorage::new(tmp_dir.path().to_str().unwrap()));
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let sparse_index_cache =
Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
Expand Down
2 changes: 0 additions & 2 deletions rust/worker/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ pub(crate) enum StorageConfig {
S3(S3StorageConfig),
#[serde(alias = "local")]
Local(LocalStorageConfig),
#[serde(alias = "sync_local")]
SyncLocal(LocalStorageConfig),
}

#[derive(Deserialize, PartialEq, Debug)]
Expand Down
8 changes: 4 additions & 4 deletions rust/worker/src/storage/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl LocalStorage {
) -> Result<Box<dyn Stream<Item = ByteStreamItem> + Unpin + Send>, String> {
let file_path = format!("{}/{}", self.root, key);
tracing::debug!("Reading from path: {}", file_path);
match tokio::fs::File::open(file_path).await {
match std::fs::File::open(file_path) {
Ok(file) => {
let stream = file.byte_stream();
return Ok(Box::new(stream));
Expand All @@ -41,8 +41,8 @@ impl LocalStorage {
// Create the path if it doesn't exist, we unwrap since this should only be used in tests
let as_path = std::path::Path::new(&path);
let parent = as_path.parent().unwrap();
tokio::fs::create_dir_all(parent).await.unwrap();
let res = tokio::fs::write(&path, bytes).await;
std::fs::create_dir_all(parent).unwrap();
let res = std::fs::write(&path, bytes);
match res {
Ok(_) => {
return Ok(());
Expand All @@ -54,7 +54,7 @@ impl LocalStorage {
}

pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), String> {
let file = tokio::fs::read(path).await;
let file = std::fs::read(path);
match file {
Ok(bytes_u8) => {
return self.put_bytes(key, &bytes_u8).await;
Expand Down
20 changes: 0 additions & 20 deletions rust/worker/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ pub(crate) mod config;
pub(crate) mod local;
pub(crate) mod s3;
pub(crate) mod stream;
pub(crate) mod sync_local;
use futures::Stream;
use thiserror::Error;

#[derive(Clone)]
pub(crate) enum Storage {
S3(s3::S3Storage),
Local(local::LocalStorage),
SyncLocal(sync_local::SyncLocalStorage),
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -78,13 +76,6 @@ impl Storage {
Err(e) => Err(GetError::LocalError(e)),
}
}
Storage::SyncLocal(sync_local) => {
let res = sync_local.get(key).await;
match res {
Ok(res) => Ok(res),
Err(e) => Err(GetError::LocalError(e)),
}
}
}
}

Expand All @@ -98,10 +89,6 @@ impl Storage {
.put_file(key, path)
.await
.map_err(|e| PutError::LocalError(e)),
Storage::SyncLocal(sync_local) => sync_local
.put_file(key, path)
.await
.map_err(|e| PutError::LocalError(e)),
}
}

Expand All @@ -115,10 +102,6 @@ impl Storage {
.put_bytes(key, &bytes)
.await
.map_err(|e| PutError::LocalError(e)),
Storage::SyncLocal(sync_local) => sync_local
.put_bytes(key, &bytes)
.await
.map_err(|e| PutError::LocalError(e)),
}
}
}
Expand All @@ -129,8 +112,5 @@ pub(crate) async fn from_config(config: &StorageConfig) -> Result<Storage, Box<d
StorageConfig::Local(_) => Ok(Storage::Local(
local::LocalStorage::try_from_config(config).await?,
)),
StorageConfig::SyncLocal(_) => Ok(Storage::SyncLocal(
sync_local::SyncLocalStorage::try_from_config(config).await?,
)),
}
}
81 changes: 0 additions & 81 deletions rust/worker/src/storage/sync_local.rs

This file was deleted.

0 comments on commit cdec540

Please sign in to comment.