Skip to content

Commit

Permalink
Combine FsRepository and OpenFsRepository
Browse files Browse the repository at this point in the history
Use the type state pattern to combine these two types into a single
type. This is some groundwork for creating new types to represent repos
that have renders or not. This code doesn't make much use of the type
state pattern yet.

`FsRepository` used to represent a repo that may not be opened yet, this
is now represented by the type alias:

    type MaybeOpenFsRepository = FsRepository<MaybeOpenFsRepositoryImpl>

An already opened repository has the same name, but is now a type alias:

    type OpenFsRepository = FsRepository<OpenFsRepositoryImpl>

The original types have been renamed to these *Impl names:

    FsRepository -> MaybeOpenFsRepositoryImpl
    OpenFsRepository -> OpenFsRepositoryImpl

Signed-off-by: J Robert Ray <[email protected]>
  • Loading branch information
jrray committed Dec 10, 2024
1 parent 208b237 commit d08ddba
Show file tree
Hide file tree
Showing 31 changed files with 467 additions and 159 deletions.
2 changes: 1 addition & 1 deletion crates/spfs-cli/main/src/cmd_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl InitSubcommand {
pub async fn run(&self, _config: &spfs::Config) -> Result<i32> {
match self {
Self::Repo { path } => {
spfs::storage::fs::FsRepository::create(&path).await?;
spfs::storage::fs::MaybeOpenFsRepository::create(&path).await?;
Ok(0)
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/spfs-vfs/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use fuser::{
Request,
};
use spfs::prelude::*;
use spfs::storage::LocalRepository;
#[cfg(feature = "fuse-backend-abi-7-31")]
use spfs::tracking::BlobRead;
use spfs::tracking::{Entry, EntryKind, EnvSpec, Manifest};
Expand Down Expand Up @@ -381,7 +382,7 @@ impl Filesystem {
reply.error(libc::ENOENT);
return;
};
let payload_path = fs_repo.payloads.build_digest_path(digest);
let payload_path = fs_repo.payloads().build_digest_path(digest);
match std::fs::OpenOptions::new().read(true).open(payload_path) {
Ok(file) => {
handle = Some(Handle::BlobFile { entry, file });
Expand Down
3 changes: 2 additions & 1 deletion crates/spfs-vfs/src/winfsp/mount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;
use dashmap::DashMap;
use libc::c_void;
use spfs::prelude::*;
use spfs::storage::LocalRepository;
use spfs::tracking::{Entry, EntryKind};
use spfs::OsError;
use tokio::io::AsyncReadExt;
Expand Down Expand Up @@ -287,7 +288,7 @@ impl winfsp::filesystem::FileSystemContext for Mount {
send.send(Err(winfsp::FspError::IO(std::io::ErrorKind::NotFound)));
return;
};
let payload_path = fs_repo.payloads.build_digest_path(&digest);
let payload_path = fs_repo.payloads().build_digest_path(&digest);
match std::fs::OpenOptions::new().read(true).open(payload_path) {
Ok(file) => {
let _ = send.send(Ok(Some(Handle::BlobFile { entry, file })));
Expand Down
2 changes: 1 addition & 1 deletion crates/spfs/benches/spfs_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn commit_benchmark(c: &mut Criterion) {
.expect("create a temp directory for spfs repo");
let repo: Arc<RepositoryHandle> = Arc::new(
tokio_runtime
.block_on(spfs::storage::fs::FsRepository::create(
.block_on(spfs::storage::fs::MaybeOpenFsRepository::create(
repo_path.path().join("repo"),
))
.expect("create spfs repo")
Expand Down
4 changes: 2 additions & 2 deletions crates/spfs/src/bootstrap_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn test_shell_initialization_startup_scripts(
};
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::create(&root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -113,7 +113,7 @@ async fn test_shell_initialization_no_startup_scripts(shell: &str, tmpdir: tempf
};
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::create(&root)
.await
.unwrap(),
);
Expand Down
7 changes: 4 additions & 3 deletions crates/spfs/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use progress_bar_derive_macro::ProgressBar;
use super::prune::PruneParameters;
use crate::prelude::*;
use crate::runtime::makedirs_with_perms;
use crate::storage::fs::OpenFsRepository;
use crate::storage::fs::FsRepositoryOps;
use crate::{encoding, graph, storage, tracking, Error, Result};

#[cfg(test)]
Expand Down Expand Up @@ -609,7 +609,7 @@ where
async fn remove_unvisited_renders_and_proxies_for_storage(
&self,
username: Option<String>,
repo: &storage::fs::OpenFsRepository,
repo: impl FsRepositoryOps,
) -> Result<CleanResult> {
let mut result = CleanResult::default();
let mut stream = repo
Expand Down Expand Up @@ -715,7 +715,8 @@ where
let future = async move {
if !self.dry_run {
tracing::trace!(?path, "removing proxy render");
OpenFsRepository::remove_dir_atomically(&path, &workdir).await?;
storage::fs::OpenFsRepository::remove_dir_atomically(&path, &workdir)
.await?;
}
Ok(digest)
};
Expand Down
8 changes: 4 additions & 4 deletions crates/spfs/src/clean_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ async fn test_clean_untagged_objects_layers_platforms(#[future] tmprepo: TempRep
async fn test_clean_manifest_renders(tmpdir: tempfile::TempDir) {
init_logging();
let tmprepo = Arc::new(
storage::fs::FsRepository::create(tmpdir.path())
storage::fs::MaybeOpenFsRepository::create(tmpdir.path())
.await
.unwrap()
.into(),
Expand Down Expand Up @@ -254,12 +254,12 @@ async fn test_clean_manifest_renders(tmpdir: tempfile::TempDir) {
};
let fs_repo = fs_repo.opened().await.unwrap();

storage::fs::Renderer::new(&*fs_repo)
storage::fs::Renderer::new(&fs_repo)
.render_manifest(&manifest.to_graph_manifest(), None)
.await
.unwrap();

let files = list_files(fs_repo.objects.root());
let files = list_files(fs_repo.fs_impl.objects.root());
assert!(!files.is_empty(), "should have stored data");

let cleaner = Cleaner::new(&tmprepo).with_reporter(TracingCleanReporter);
Expand All @@ -269,7 +269,7 @@ async fn test_clean_manifest_renders(tmpdir: tempfile::TempDir) {
.expect("failed to clean repo");
println!("{result:#?}");

let files = list_files(fs_repo.renders.as_ref().unwrap().renders.root());
let files = list_files(fs_repo.fs_impl.renders.as_ref().unwrap().renders.root());
assert_eq!(
files,
Vec::<String>::new(),
Expand Down
4 changes: 2 additions & 2 deletions crates/spfs/src/commit_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use crate::Error;
async fn test_commit_empty(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(&root)
crate::storage::fs::MaybeOpenFsRepository::create(&root)
.await
.unwrap(),
);
let storage = crate::runtime::Storage::new(repo).unwrap();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down
11 changes: 6 additions & 5 deletions crates/spfs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@ impl RemoteConfig {
inner,
} = self;
let mut handle: storage::RepositoryHandle = match inner.clone() {
RepositoryConfig::Fs(config) => {
storage::fs::FsRepository::from_config(config).await?.into()
}
RepositoryConfig::Fs(config) => storage::fs::MaybeOpenFsRepository::from_config(config)
.await?
.into(),
RepositoryConfig::Tar(config) => storage::tar::TarRepository::from_config(config)
.await?
.into(),
Expand Down Expand Up @@ -555,7 +555,8 @@ impl Config {
source,
})?;

local_repo.set_tag_namespace(self.storage.tag_namespace.clone());
Arc::make_mut(&mut local_repo.fs_impl)
.set_tag_namespace(self.storage.tag_namespace.clone());

Ok(local_repo)
}
Expand All @@ -564,7 +565,7 @@ impl Config {
///
/// The returned repo is guaranteed to be created, valid and open already. Ie
/// the local repository is not allowed to be lazily opened.
pub async fn get_local_repository(&self) -> Result<storage::fs::FsRepository> {
pub async fn get_local_repository(&self) -> Result<storage::fs::OpenFsRepository> {
self.get_opened_local_repository().await.map(Into::into)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/spfs/src/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_config_get_remote() {
.tempdir()
.unwrap();
let remote = tmpdir.path().join("remote");
let _ = crate::storage::fs::FsRepository::create(&remote)
let _ = crate::storage::fs::MaybeOpenFsRepository::create(&remote)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/spfs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl Error {

/// Create an [`Error::FailedToOpenRepository`] instance for
/// a repository using its address and root cause.
pub fn failed_to_open_repository<R: storage::Repository>(
pub fn failed_to_open_repository<R: storage::Address>(
repo: &R,
source: storage::OpenRepositoryError,
) -> Self {
Expand Down
23 changes: 15 additions & 8 deletions crates/spfs/src/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,19 @@ impl TempRepo {
{
match self {
TempRepo::FS(_, tempdir) => {
let mut repo = spfs::storage::fs::FsRepository::open(tempdir.path().join("repo"))
.await
.unwrap();
repo.set_tag_namespace(Some(spfs::storage::TagNamespaceBuf::new(
namespace.as_ref(),
)));
let repo = spfs::storage::fs::MaybeOpenFsRepository {
fs_impl: {
let mut fs_impl = spfs::storage::fs::MaybeOpenFsRepositoryImpl::open(
tempdir.path().join("repo"),
)
.await
.unwrap();
fs_impl.set_tag_namespace(Some(spfs::storage::TagNamespaceBuf::new(
namespace.as_ref(),
)));
fs_impl.into()
},
};
TempRepo::FS(Arc::new(repo.into()), Arc::clone(tempdir))
}
_ => panic!("only TempRepo::FS type supports setting tag namespaces"),
Expand Down Expand Up @@ -136,7 +143,7 @@ pub async fn tmprepo(kind: &str) -> TempRepo {
let tmpdir = tmpdir();
match kind {
"fs" => {
let repo = spfs::storage::fs::FsRepository::create(tmpdir.path().join("repo"))
let repo = spfs::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("repo"))
.await
.unwrap()
.into();
Expand All @@ -153,7 +160,7 @@ pub async fn tmprepo(kind: &str) -> TempRepo {
"rpc" => {
use crate::storage::prelude::*;
let repo = std::sync::Arc::new(spfs::storage::RepositoryHandle::FS(
spfs::storage::fs::FsRepository::create(tmpdir.path().join("repo"))
spfs::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("repo"))
.await
.unwrap(),
));
Expand Down
2 changes: 1 addition & 1 deletion crates/spfs/src/runtime/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ impl Storage {
pub async fn durable_path(&self, name: String) -> Result<PathBuf> {
match &*self.inner {
RepositoryHandle::FS(repo) => {
let mut upper_root_path = repo.root();
let mut upper_root_path = repo.fs_impl.root();
upper_root_path.push(DURABLE_EDITS_DIR);
upper_root_path.push(name);
Ok(upper_root_path)
Expand Down
20 changes: 10 additions & 10 deletions crates/spfs/src/runtime/storage_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn test_config_serialization() {
async fn test_storage_create_runtime(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -73,7 +73,7 @@ async fn test_storage_runtime_with_annotation(

let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -139,7 +139,7 @@ async fn test_storage_runtime_add_annotations_list(

let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -212,7 +212,7 @@ async fn test_storage_runtime_with_nested_annotation(
// Setup the objects needed for the runtime used in the test
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn test_storage_runtime_with_annotation_all(

let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -355,7 +355,7 @@ async fn test_storage_runtime_with_nested_annotation_all(
// setup the objects needed for the runtime used in the test
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -431,7 +431,7 @@ async fn test_storage_runtime_with_nested_annotation_all(
async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand All @@ -452,7 +452,7 @@ async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) {
async fn test_storage_iter_runtimes(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -504,7 +504,7 @@ async fn test_storage_iter_runtimes(tmpdir: tempfile::TempDir) {
async fn test_runtime_reset(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down Expand Up @@ -551,7 +551,7 @@ async fn test_runtime_reset(tmpdir: tempfile::TempDir) {
async fn test_runtime_ensure_extra_bind_mount_locations_exist(tmpdir: tempfile::TempDir) {
let root = tmpdir.path().to_string_lossy().to_string();
let repo = crate::storage::RepositoryHandle::from(
crate::storage::fs::FsRepository::create(root)
crate::storage::fs::MaybeOpenFsRepository::create(root)
.await
.unwrap(),
);
Expand Down
7 changes: 4 additions & 3 deletions crates/spfs/src/storage/fallback/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ impl TagStorageMut for FallbackProxy {
&mut self,
tag_namespace: Option<TagNamespaceBuf>,
) -> Result<Option<TagNamespaceBuf>> {
Ok(Arc::make_mut(&mut self.primary).set_tag_namespace(tag_namespace))
Ok(Arc::make_mut(&mut Arc::make_mut(&mut self.primary).fs_impl)
.set_tag_namespace(tag_namespace))
}
}

Expand All @@ -386,12 +387,12 @@ impl Address for FallbackProxy {
impl LocalRepository for FallbackProxy {
#[inline]
fn payloads(&self) -> &FsHashStore {
self.primary.payloads()
self.primary.fs_impl.payloads()
}

#[inline]
fn render_store(&self) -> Result<&RenderStore> {
self.primary.render_store()
self.primary.fs_impl.render_store()
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/spfs/src/storage/fallback/repository_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn test_proxy_payload_repair(tmpdir: tempfile::TempDir) {
.unwrap();

// Delete the payload file from the primary repo.
let payload_path = primary.payloads.build_digest_path(&digest);
let payload_path = primary.fs_impl.payloads.build_digest_path(&digest);
tokio::fs::remove_file(payload_path).await.unwrap();

// Loading the payload from the primary should fail.
Expand Down
Loading

0 comments on commit d08ddba

Please sign in to comment.