From d08ddbaaae38019cf1492d5677f6bb30cb3f5ece Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Wed, 27 Nov 2024 16:29:05 -0800 Subject: [PATCH] Combine FsRepository and OpenFsRepository Use the type state pattern to combine these two types into a single type. This is some groundwork for creating new types to represent repos that have renders or not. This code doesn't make much use of the type state pattern yet. `FsRepository` used to represent a repo that may not be opened yet, this is now represented by the type alias: type MaybeOpenFsRepository = FsRepository An already opened repository has the same name, but is now a type alias: type OpenFsRepository = FsRepository The original types have been renamed to these *Impl names: FsRepository -> MaybeOpenFsRepositoryImpl OpenFsRepository -> OpenFsRepositoryImpl Signed-off-by: J Robert Ray --- crates/spfs-cli/main/src/cmd_init.rs | 2 +- crates/spfs-vfs/src/fuse.rs | 3 +- crates/spfs-vfs/src/winfsp/mount.rs | 3 +- crates/spfs/benches/spfs_bench.rs | 2 +- crates/spfs/src/bootstrap_test.rs | 4 +- crates/spfs/src/clean.rs | 7 +- crates/spfs/src/clean_test.rs | 8 +- crates/spfs/src/commit_test.rs | 4 +- crates/spfs/src/config.rs | 11 +- crates/spfs/src/config_test.rs | 2 +- crates/spfs/src/error.rs | 2 +- crates/spfs/src/fixtures.rs | 23 +- crates/spfs/src/runtime/storage.rs | 2 +- crates/spfs/src/runtime/storage_test.rs | 20 +- .../spfs/src/storage/fallback/repository.rs | 7 +- .../src/storage/fallback/repository_test.rs | 2 +- crates/spfs/src/storage/fs/database.rs | 28 +- crates/spfs/src/storage/fs/mod.rs | 5 +- crates/spfs/src/storage/fs/payloads.rs | 14 +- crates/spfs/src/storage/fs/renderer.rs | 13 +- crates/spfs/src/storage/fs/renderer_test.rs | 7 +- crates/spfs/src/storage/fs/repository.rs | 378 +++++++++++++++--- crates/spfs/src/storage/fs/tag.rs | 17 +- crates/spfs/src/storage/handle.rs | 6 +- .../spfs/src/storage/proxy/repository_test.rs | 27 +- crates/spfs/src/storage/repository_test.rs | 9 +- crates/spfs/src/storage/tag_test.rs | 4 +- crates/spfs/src/storage/tar/repository.rs | 4 +- crates/spfs/src/sync_test.rs | 4 +- crates/spk-storage/src/fixtures.rs | 2 +- crates/spk-storage/src/storage/spfs_test.rs | 6 +- 31 files changed, 467 insertions(+), 159 deletions(-) diff --git a/crates/spfs-cli/main/src/cmd_init.rs b/crates/spfs-cli/main/src/cmd_init.rs index 7692bcff90..fd274f71af 100644 --- a/crates/spfs-cli/main/src/cmd_init.rs +++ b/crates/spfs-cli/main/src/cmd_init.rs @@ -36,7 +36,7 @@ impl InitSubcommand { pub async fn run(&self, _config: &spfs::Config) -> Result { match self { Self::Repo { path } => { - spfs::storage::fs::FsRepository::create(&path).await?; + spfs::storage::fs::MaybeOpenFsRepository::create(&path).await?; Ok(0) } } diff --git a/crates/spfs-vfs/src/fuse.rs b/crates/spfs-vfs/src/fuse.rs index 4ea3dd644e..bbfdcc64f0 100644 --- a/crates/spfs-vfs/src/fuse.rs +++ b/crates/spfs-vfs/src/fuse.rs @@ -29,6 +29,7 @@ use fuser::{ Request, }; use spfs::prelude::*; +use spfs::storage::LocalRepository; #[cfg(feature = "fuse-backend-abi-7-31")] use spfs::tracking::BlobRead; use spfs::tracking::{Entry, EntryKind, EnvSpec, Manifest}; @@ -381,7 +382,7 @@ impl Filesystem { reply.error(libc::ENOENT); return; }; - let payload_path = fs_repo.payloads.build_digest_path(digest); + let payload_path = fs_repo.payloads().build_digest_path(digest); match std::fs::OpenOptions::new().read(true).open(payload_path) { Ok(file) => { handle = Some(Handle::BlobFile { entry, file }); diff --git a/crates/spfs-vfs/src/winfsp/mount.rs b/crates/spfs-vfs/src/winfsp/mount.rs index eec4326fb4..1dadda7943 100644 --- a/crates/spfs-vfs/src/winfsp/mount.rs +++ b/crates/spfs-vfs/src/winfsp/mount.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use dashmap::DashMap; use libc::c_void; use spfs::prelude::*; +use spfs::storage::LocalRepository; use spfs::tracking::{Entry, EntryKind}; use spfs::OsError; use tokio::io::AsyncReadExt; @@ -287,7 +288,7 @@ impl winfsp::filesystem::FileSystemContext for Mount { send.send(Err(winfsp::FspError::IO(std::io::ErrorKind::NotFound))); return; }; - let payload_path = fs_repo.payloads.build_digest_path(&digest); + let payload_path = fs_repo.payloads().build_digest_path(&digest); match std::fs::OpenOptions::new().read(true).open(payload_path) { Ok(file) => { let _ = send.send(Ok(Some(Handle::BlobFile { entry, file }))); diff --git a/crates/spfs/benches/spfs_bench.rs b/crates/spfs/benches/spfs_bench.rs index 54ef4c345a..84cf854e34 100644 --- a/crates/spfs/benches/spfs_bench.rs +++ b/crates/spfs/benches/spfs_bench.rs @@ -44,7 +44,7 @@ pub fn commit_benchmark(c: &mut Criterion) { .expect("create a temp directory for spfs repo"); let repo: Arc = Arc::new( tokio_runtime - .block_on(spfs::storage::fs::FsRepository::create( + .block_on(spfs::storage::fs::MaybeOpenFsRepository::create( repo_path.path().join("repo"), )) .expect("create spfs repo") diff --git a/crates/spfs/src/bootstrap_test.rs b/crates/spfs/src/bootstrap_test.rs index e9d96004f1..e7d8e96b3b 100644 --- a/crates/spfs/src/bootstrap_test.rs +++ b/crates/spfs/src/bootstrap_test.rs @@ -37,7 +37,7 @@ async fn test_shell_initialization_startup_scripts( }; let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(&root) + crate::storage::fs::MaybeOpenFsRepository::create(&root) .await .unwrap(), ); @@ -113,7 +113,7 @@ async fn test_shell_initialization_no_startup_scripts(shell: &str, tmpdir: tempf }; let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(&root) + crate::storage::fs::MaybeOpenFsRepository::create(&root) .await .unwrap(), ); diff --git a/crates/spfs/src/clean.rs b/crates/spfs/src/clean.rs index 84def7cc4c..7c94b4536d 100644 --- a/crates/spfs/src/clean.rs +++ b/crates/spfs/src/clean.rs @@ -18,7 +18,7 @@ use progress_bar_derive_macro::ProgressBar; use super::prune::PruneParameters; use crate::prelude::*; use crate::runtime::makedirs_with_perms; -use crate::storage::fs::OpenFsRepository; +use crate::storage::fs::FsRepositoryOps; use crate::{encoding, graph, storage, tracking, Error, Result}; #[cfg(test)] @@ -609,7 +609,7 @@ where async fn remove_unvisited_renders_and_proxies_for_storage( &self, username: Option, - repo: &storage::fs::OpenFsRepository, + repo: impl FsRepositoryOps, ) -> Result { let mut result = CleanResult::default(); let mut stream = repo @@ -715,7 +715,8 @@ where let future = async move { if !self.dry_run { tracing::trace!(?path, "removing proxy render"); - OpenFsRepository::remove_dir_atomically(&path, &workdir).await?; + storage::fs::OpenFsRepository::remove_dir_atomically(&path, &workdir) + .await?; } Ok(digest) }; diff --git a/crates/spfs/src/clean_test.rs b/crates/spfs/src/clean_test.rs index b1ea41ec43..48c9fae34d 100644 --- a/crates/spfs/src/clean_test.rs +++ b/crates/spfs/src/clean_test.rs @@ -225,7 +225,7 @@ async fn test_clean_untagged_objects_layers_platforms(#[future] tmprepo: TempRep async fn test_clean_manifest_renders(tmpdir: tempfile::TempDir) { init_logging(); let tmprepo = Arc::new( - storage::fs::FsRepository::create(tmpdir.path()) + storage::fs::MaybeOpenFsRepository::create(tmpdir.path()) .await .unwrap() .into(), @@ -254,12 +254,12 @@ async fn test_clean_manifest_renders(tmpdir: tempfile::TempDir) { }; let fs_repo = fs_repo.opened().await.unwrap(); - storage::fs::Renderer::new(&*fs_repo) + storage::fs::Renderer::new(&fs_repo) .render_manifest(&manifest.to_graph_manifest(), None) .await .unwrap(); - let files = list_files(fs_repo.objects.root()); + let files = list_files(fs_repo.fs_impl.objects.root()); assert!(!files.is_empty(), "should have stored data"); let cleaner = Cleaner::new(&tmprepo).with_reporter(TracingCleanReporter); @@ -269,7 +269,7 @@ async fn test_clean_manifest_renders(tmpdir: tempfile::TempDir) { .expect("failed to clean repo"); println!("{result:#?}"); - let files = list_files(fs_repo.renders.as_ref().unwrap().renders.root()); + let files = list_files(fs_repo.fs_impl.renders.as_ref().unwrap().renders.root()); assert_eq!( files, Vec::::new(), diff --git a/crates/spfs/src/commit_test.rs b/crates/spfs/src/commit_test.rs index efb824cf55..8b1b9f04dd 100644 --- a/crates/spfs/src/commit_test.rs +++ b/crates/spfs/src/commit_test.rs @@ -13,13 +13,13 @@ use crate::Error; async fn test_commit_empty(tmpdir: tempfile::TempDir) { let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(&root) + crate::storage::fs::MaybeOpenFsRepository::create(&root) .await .unwrap(), ); let storage = crate::runtime::Storage::new(repo).unwrap(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); diff --git a/crates/spfs/src/config.rs b/crates/spfs/src/config.rs index 2b15380a47..d4320cd5ff 100644 --- a/crates/spfs/src/config.rs +++ b/crates/spfs/src/config.rs @@ -311,9 +311,9 @@ impl RemoteConfig { inner, } = self; let mut handle: storage::RepositoryHandle = match inner.clone() { - RepositoryConfig::Fs(config) => { - storage::fs::FsRepository::from_config(config).await?.into() - } + RepositoryConfig::Fs(config) => storage::fs::MaybeOpenFsRepository::from_config(config) + .await? + .into(), RepositoryConfig::Tar(config) => storage::tar::TarRepository::from_config(config) .await? .into(), @@ -555,7 +555,8 @@ impl Config { source, })?; - local_repo.set_tag_namespace(self.storage.tag_namespace.clone()); + Arc::make_mut(&mut local_repo.fs_impl) + .set_tag_namespace(self.storage.tag_namespace.clone()); Ok(local_repo) } @@ -564,7 +565,7 @@ impl Config { /// /// The returned repo is guaranteed to be created, valid and open already. Ie /// the local repository is not allowed to be lazily opened. - pub async fn get_local_repository(&self) -> Result { + pub async fn get_local_repository(&self) -> Result { self.get_opened_local_repository().await.map(Into::into) } diff --git a/crates/spfs/src/config_test.rs b/crates/spfs/src/config_test.rs index b65ed0633b..85a3447e32 100644 --- a/crates/spfs/src/config_test.rs +++ b/crates/spfs/src/config_test.rs @@ -41,7 +41,7 @@ async fn test_config_get_remote() { .tempdir() .unwrap(); let remote = tmpdir.path().join("remote"); - let _ = crate::storage::fs::FsRepository::create(&remote) + let _ = crate::storage::fs::MaybeOpenFsRepository::create(&remote) .await .unwrap(); diff --git a/crates/spfs/src/error.rs b/crates/spfs/src/error.rs index db6effce84..f45296fd8b 100644 --- a/crates/spfs/src/error.rs +++ b/crates/spfs/src/error.rs @@ -223,7 +223,7 @@ impl Error { /// Create an [`Error::FailedToOpenRepository`] instance for /// a repository using its address and root cause. - pub fn failed_to_open_repository( + pub fn failed_to_open_repository( repo: &R, source: storage::OpenRepositoryError, ) -> Self { diff --git a/crates/spfs/src/fixtures.rs b/crates/spfs/src/fixtures.rs index af5f6a883a..7cb12a1023 100644 --- a/crates/spfs/src/fixtures.rs +++ b/crates/spfs/src/fixtures.rs @@ -39,12 +39,19 @@ impl TempRepo { { match self { TempRepo::FS(_, tempdir) => { - let mut repo = spfs::storage::fs::FsRepository::open(tempdir.path().join("repo")) - .await - .unwrap(); - repo.set_tag_namespace(Some(spfs::storage::TagNamespaceBuf::new( - namespace.as_ref(), - ))); + let repo = spfs::storage::fs::MaybeOpenFsRepository { + fs_impl: { + let mut fs_impl = spfs::storage::fs::MaybeOpenFsRepositoryImpl::open( + tempdir.path().join("repo"), + ) + .await + .unwrap(); + fs_impl.set_tag_namespace(Some(spfs::storage::TagNamespaceBuf::new( + namespace.as_ref(), + ))); + fs_impl.into() + }, + }; TempRepo::FS(Arc::new(repo.into()), Arc::clone(tempdir)) } _ => panic!("only TempRepo::FS type supports setting tag namespaces"), @@ -136,7 +143,7 @@ pub async fn tmprepo(kind: &str) -> TempRepo { let tmpdir = tmpdir(); match kind { "fs" => { - let repo = spfs::storage::fs::FsRepository::create(tmpdir.path().join("repo")) + let repo = spfs::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("repo")) .await .unwrap() .into(); @@ -153,7 +160,7 @@ pub async fn tmprepo(kind: &str) -> TempRepo { "rpc" => { use crate::storage::prelude::*; let repo = std::sync::Arc::new(spfs::storage::RepositoryHandle::FS( - spfs::storage::fs::FsRepository::create(tmpdir.path().join("repo")) + spfs::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("repo")) .await .unwrap(), )); diff --git a/crates/spfs/src/runtime/storage.rs b/crates/spfs/src/runtime/storage.rs index 2d5be78057..c94a3dfcc9 100644 --- a/crates/spfs/src/runtime/storage.rs +++ b/crates/spfs/src/runtime/storage.rs @@ -1178,7 +1178,7 @@ impl Storage { pub async fn durable_path(&self, name: String) -> Result { match &*self.inner { RepositoryHandle::FS(repo) => { - let mut upper_root_path = repo.root(); + let mut upper_root_path = repo.fs_impl.root(); upper_root_path.push(DURABLE_EDITS_DIR); upper_root_path.push(name); Ok(upper_root_path) diff --git a/crates/spfs/src/runtime/storage_test.rs b/crates/spfs/src/runtime/storage_test.rs index a268b9ce41..205f7e216e 100644 --- a/crates/spfs/src/runtime/storage_test.rs +++ b/crates/spfs/src/runtime/storage_test.rs @@ -35,7 +35,7 @@ fn test_config_serialization() { async fn test_storage_create_runtime(tmpdir: tempfile::TempDir) { let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -73,7 +73,7 @@ async fn test_storage_runtime_with_annotation( let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -139,7 +139,7 @@ async fn test_storage_runtime_add_annotations_list( let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -212,7 +212,7 @@ async fn test_storage_runtime_with_nested_annotation( // Setup the objects needed for the runtime used in the test let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -281,7 +281,7 @@ async fn test_storage_runtime_with_annotation_all( let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -355,7 +355,7 @@ async fn test_storage_runtime_with_nested_annotation_all( // setup the objects needed for the runtime used in the test let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -431,7 +431,7 @@ async fn test_storage_runtime_with_nested_annotation_all( async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) { let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -452,7 +452,7 @@ async fn test_storage_remove_runtime(tmpdir: tempfile::TempDir) { async fn test_storage_iter_runtimes(tmpdir: tempfile::TempDir) { let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -504,7 +504,7 @@ async fn test_storage_iter_runtimes(tmpdir: tempfile::TempDir) { async fn test_runtime_reset(tmpdir: tempfile::TempDir) { let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); @@ -551,7 +551,7 @@ async fn test_runtime_reset(tmpdir: tempfile::TempDir) { async fn test_runtime_ensure_extra_bind_mount_locations_exist(tmpdir: tempfile::TempDir) { let root = tmpdir.path().to_string_lossy().to_string(); let repo = crate::storage::RepositoryHandle::from( - crate::storage::fs::FsRepository::create(root) + crate::storage::fs::MaybeOpenFsRepository::create(root) .await .unwrap(), ); diff --git a/crates/spfs/src/storage/fallback/repository.rs b/crates/spfs/src/storage/fallback/repository.rs index 8523161de6..46ba059da7 100644 --- a/crates/spfs/src/storage/fallback/repository.rs +++ b/crates/spfs/src/storage/fallback/repository.rs @@ -361,7 +361,8 @@ impl TagStorageMut for FallbackProxy { &mut self, tag_namespace: Option, ) -> Result> { - Ok(Arc::make_mut(&mut self.primary).set_tag_namespace(tag_namespace)) + Ok(Arc::make_mut(&mut Arc::make_mut(&mut self.primary).fs_impl) + .set_tag_namespace(tag_namespace)) } } @@ -386,12 +387,12 @@ impl Address for FallbackProxy { impl LocalRepository for FallbackProxy { #[inline] fn payloads(&self) -> &FsHashStore { - self.primary.payloads() + self.primary.fs_impl.payloads() } #[inline] fn render_store(&self) -> Result<&RenderStore> { - self.primary.render_store() + self.primary.fs_impl.render_store() } } diff --git a/crates/spfs/src/storage/fallback/repository_test.rs b/crates/spfs/src/storage/fallback/repository_test.rs index 3d9e150926..4167e63609 100644 --- a/crates/spfs/src/storage/fallback/repository_test.rs +++ b/crates/spfs/src/storage/fallback/repository_test.rs @@ -36,7 +36,7 @@ async fn test_proxy_payload_repair(tmpdir: tempfile::TempDir) { .unwrap(); // Delete the payload file from the primary repo. - let payload_path = primary.payloads.build_digest_path(&digest); + let payload_path = primary.fs_impl.payloads.build_digest_path(&digest); tokio::fs::remove_file(payload_path).await.unwrap(); // Loading the payload from the primary should fail. diff --git a/crates/spfs/src/storage/fs/database.rs b/crates/spfs/src/storage/fs/database.rs index b45f86f2ff..472fec7871 100644 --- a/crates/spfs/src/storage/fs/database.rs +++ b/crates/spfs/src/storage/fs/database.rs @@ -16,7 +16,7 @@ use crate::graph::{DatabaseView, Object, ObjectProto}; use crate::{encoding, graph, Error, Result}; #[async_trait::async_trait] -impl DatabaseView for super::FsRepository { +impl DatabaseView for super::MaybeOpenFsRepository { async fn has_object(&self, digest: encoding::Digest) -> bool { let Ok(opened) = self.opened().await else { return false; @@ -55,7 +55,7 @@ impl DatabaseView for super::FsRepository { } #[async_trait::async_trait] -impl graph::Database for super::FsRepository { +impl graph::Database for super::MaybeOpenFsRepository { async fn remove_object(&self, digest: encoding::Digest) -> crate::Result<()> { self.opened().await?.remove_object(digest).await } @@ -73,7 +73,7 @@ impl graph::Database for super::FsRepository { } #[async_trait::async_trait] -impl graph::DatabaseExt for super::FsRepository { +impl graph::DatabaseExt for super::MaybeOpenFsRepository { async fn write_object(&self, obj: &graph::FlatObject) -> Result<()> { self.opened().await?.write_object(obj).await } @@ -82,12 +82,12 @@ impl graph::DatabaseExt for super::FsRepository { #[async_trait::async_trait] impl DatabaseView for super::OpenFsRepository { async fn has_object(&self, digest: encoding::Digest) -> bool { - let filepath = self.objects.build_digest_path(&digest); + let filepath = self.fs_impl.objects.build_digest_path(&digest); tokio::fs::symlink_metadata(filepath).await.is_ok() } async fn read_object(&self, digest: encoding::Digest) -> Result { - let filepath = self.objects.build_digest_path(&digest); + let filepath = self.fs_impl.objects.build_digest_path(&digest); let mut file = tokio::io::BufReader::new(tokio::fs::File::open(&filepath).await.map_err(|err| { match err.kind() { @@ -109,7 +109,7 @@ impl DatabaseView for super::OpenFsRepository { &self, search_criteria: graph::DigestSearchCriteria, ) -> Pin> + Send>> { - Box::pin(self.objects.find(search_criteria)) + Box::pin(self.fs_impl.objects.find(search_criteria)) } fn iter_objects(&self) -> graph::DatabaseIterator<'_> { @@ -124,14 +124,14 @@ impl DatabaseView for super::OpenFsRepository { &self, partial: &encoding::PartialDigest, ) -> Result { - self.objects.resolve_full_digest(partial).await + self.fs_impl.objects.resolve_full_digest(partial).await } } #[async_trait::async_trait] impl graph::Database for super::OpenFsRepository { async fn remove_object(&self, digest: encoding::Digest) -> crate::Result<()> { - let filepath = self.objects.build_digest_path(&digest); + let filepath = self.fs_impl.objects.build_digest_path(&digest); // this might fail but we don't consider that fatal just yet #[cfg(unix)] @@ -156,7 +156,7 @@ impl graph::Database for super::OpenFsRepository { older_than: DateTime, digest: encoding::Digest, ) -> crate::Result { - let filepath = self.objects.build_digest_path(&digest); + let filepath = self.fs_impl.objects.build_digest_path(&digest); // this might fail but we don't consider that fatal just yet #[cfg(unix)] @@ -203,7 +203,7 @@ impl graph::Database for super::OpenFsRepository { impl graph::DatabaseExt for super::OpenFsRepository { async fn write_object(&self, obj: &graph::FlatObject) -> Result<()> { let digest = obj.digest()?; - let filepath = self.objects.build_digest_path(&digest); + let filepath = self.fs_impl.objects.build_digest_path(&digest); if filepath.exists() { tracing::trace!(%digest, kind=%std::any::type_name::(), "object already exists"); return Ok(()); @@ -214,8 +214,8 @@ impl graph::DatabaseExt for super::OpenFsRepository { // other processes don't try to read our incomplete // object from the database let uuid = uuid::Uuid::new_v4().to_string(); - let working_file = self.objects.workdir().join(uuid); - self.objects.ensure_base_dir(&working_file)?; + let working_file = self.fs_impl.objects.workdir().join(uuid); + self.fs_impl.objects.ensure_base_dir(&working_file)?; let mut encoded = Vec::new(); obj.encode(&mut encoded)?; let mut writer = tokio::io::BufWriter::new( @@ -258,7 +258,7 @@ impl graph::DatabaseExt for super::OpenFsRepository { } #[cfg(unix)] { - let perms = std::fs::Permissions::from_mode(self.objects.file_permissions); + let perms = std::fs::Permissions::from_mode(self.fs_impl.objects.file_permissions); if let Err(err) = tokio::fs::set_permissions(&working_file, perms).await { let _ = tokio::fs::remove_file(&working_file).await; return Err(Error::StorageWriteError( @@ -268,7 +268,7 @@ impl graph::DatabaseExt for super::OpenFsRepository { )); } } - self.objects.ensure_base_dir(&filepath)?; + self.fs_impl.objects.ensure_base_dir(&filepath)?; match tokio::fs::rename(&working_file, &filepath).await { Ok(_) => Ok(()), Err(err) => { diff --git a/crates/spfs/src/storage/fs/mod.rs b/crates/spfs/src/storage/fs/mod.rs index 0139e87887..9aff0138aa 100644 --- a/crates/spfs/src/storage/fs/mod.rs +++ b/crates/spfs/src/storage/fs/mod.rs @@ -31,10 +31,13 @@ pub use renderer::{ DEFAULT_MAX_CONCURRENT_BLOBS, DEFAULT_MAX_CONCURRENT_BRANCHES, }; +#[cfg(test)] +pub use repository::MaybeOpenFsRepositoryImpl; pub use repository::{ read_last_migration_version, Config, - FsRepository, + FsRepositoryOps, + MaybeOpenFsRepository, OpenFsRepository, Params, RenderStore, diff --git a/crates/spfs/src/storage/fs/payloads.rs b/crates/spfs/src/storage/fs/payloads.rs index ad0cfad78f..28661ef728 100644 --- a/crates/spfs/src/storage/fs/payloads.rs +++ b/crates/spfs/src/storage/fs/payloads.rs @@ -8,13 +8,13 @@ use std::pin::Pin; use futures::future::ready; use futures::{Stream, StreamExt, TryFutureExt}; -use super::{FsRepository, OpenFsRepository}; +use super::{MaybeOpenFsRepository, OpenFsRepository}; use crate::storage::prelude::*; use crate::tracking::BlobRead; use crate::{encoding, graph, Error, Result}; #[async_trait::async_trait] -impl crate::storage::PayloadStorage for FsRepository { +impl crate::storage::PayloadStorage for MaybeOpenFsRepository { async fn has_payload(&self, digest: encoding::Digest) -> bool { let Ok(opened) = self.opened().await else { return false; @@ -54,26 +54,26 @@ impl crate::storage::PayloadStorage for FsRepository { #[async_trait::async_trait] impl crate::storage::PayloadStorage for OpenFsRepository { async fn has_payload(&self, digest: encoding::Digest) -> bool { - let path = self.payloads.build_digest_path(&digest); + let path = self.fs_impl.payloads.build_digest_path(&digest); tokio::fs::symlink_metadata(path).await.is_ok() } fn iter_payload_digests(&self) -> Pin> + Send>> { - Box::pin(self.payloads.iter()) + Box::pin(self.fs_impl.payloads.iter()) } async unsafe fn write_data( &self, reader: Pin>, ) -> Result<(encoding::Digest, u64)> { - self.payloads.write_data(reader).await + self.fs_impl.payloads.write_data(reader).await } async fn open_payload( &self, digest: encoding::Digest, ) -> Result<(Pin>, std::path::PathBuf)> { - let path = self.payloads.build_digest_path(&digest); + let path = self.fs_impl.payloads.build_digest_path(&digest); match tokio::fs::File::open(&path).await { Ok(file) => Ok((Box::pin(tokio::io::BufReader::new(file)), path)), Err(err) => match err.kind() { @@ -97,7 +97,7 @@ impl crate::storage::PayloadStorage for OpenFsRepository { } async fn remove_payload(&self, digest: encoding::Digest) -> Result<()> { - let path = self.payloads.build_digest_path(&digest); + let path = self.fs_impl.payloads.build_digest_path(&digest); match tokio::fs::remove_file(&path).await { Ok(()) => Ok(()), Err(err) => match err.kind() { diff --git a/crates/spfs/src/storage/fs/renderer.rs b/crates/spfs/src/storage/fs/renderer.rs index 5378b23fa9..12a3d705b0 100644 --- a/crates/spfs/src/storage/fs/renderer.rs +++ b/crates/spfs/src/storage/fs/renderer.rs @@ -49,14 +49,14 @@ pub enum RenderType { impl OpenFsRepository { fn get_render_storage(&self) -> Result<&crate::storage::fs::FsHashStore> { - match &self.renders { + match &self.fs_impl.renders { Some(render_store) => Ok(&render_store.renders), - None => Err(Error::NoRenderStorage(self.address())), + None => Err(Error::NoRenderStorage(self.address().into_owned())), } } pub async fn has_rendered_manifest(&self, digest: encoding::Digest) -> bool { - let renders = match &self.renders { + let renders = match &self.fs_impl.renders { Some(render_store) => &render_store.renders, None => return false, }; @@ -76,14 +76,15 @@ impl OpenFsRepository { } pub fn proxy_path(&self) -> Option<&std::path::Path> { - self.renders + self.fs_impl + .renders .as_ref() .map(|render_store| render_store.proxy.root()) } /// Remove the identified render from this storage. pub async fn remove_rendered_manifest(&self, digest: crate::encoding::Digest) -> Result<()> { - let renders = match &self.renders { + let renders = match &self.fs_impl.renders { Some(render_store) => &render_store.renders, None => return Ok(()), }; @@ -122,7 +123,7 @@ impl OpenFsRepository { older_than: DateTime, digest: encoding::Digest, ) -> Result { - let renders = match &self.renders { + let renders = match &self.fs_impl.renders { Some(render_store) => &render_store.renders, None => return Ok(false), }; diff --git a/crates/spfs/src/storage/fs/renderer_test.rs b/crates/spfs/src/storage/fs/renderer_test.rs index 446ed5ecaf..7b839e4de3 100644 --- a/crates/spfs/src/storage/fs/renderer_test.rs +++ b/crates/spfs/src/storage/fs/renderer_test.rs @@ -10,7 +10,7 @@ use super::was_render_completed; use crate::encoding::prelude::*; use crate::fixtures::*; use crate::graph::object::{DigestStrategy, EncodingFormat}; -use crate::storage::fs::{FsRepository, OpenFsRepository}; +use crate::storage::fs::{MaybeOpenFsRepository, OpenFsRepository}; use crate::storage::{RepositoryExt, RepositoryHandle}; use crate::{tracking, Config}; @@ -83,7 +83,7 @@ async fn test_render_manifest_with_repo( config.make_current().unwrap(); let tmprepo = Arc::new( - FsRepository::create(tmpdir.path().join("repo")) + MaybeOpenFsRepository::create(tmpdir.path().join("repo")) .await .unwrap() .into(), @@ -107,13 +107,14 @@ async fn test_render_manifest_with_repo( }; let render = tmprepo + .fs_impl .renders .as_ref() .unwrap() .renders .build_digest_path(&manifest.digest().unwrap()); assert!(!render.exists(), "render should NOT be seen as existing"); - super::Renderer::new(&*tmprepo) + super::Renderer::new(&tmprepo) .render_manifest(&manifest, None) .await .unwrap(); diff --git a/crates/spfs/src/storage/fs/repository.rs b/crates/spfs/src/storage/fs/repository.rs index 829b36b009..255ba97b4f 100644 --- a/crates/spfs/src/storage/fs/repository.rs +++ b/crates/spfs/src/storage/fs/repository.rs @@ -9,9 +9,13 @@ use std::io::Write; #[cfg(unix)] use std::os::unix::prelude::PermissionsExt; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::sync::Arc; use arc_swap::ArcSwap; +use async_stream::try_stream; +use chrono::{DateTime, Utc}; +use futures::Stream; use super::hash_store::PROXY_DIRNAME; use super::migrations::{MigrationError, MigrationResult}; @@ -114,6 +118,204 @@ impl Clone for RenderStore { } } +/// Operations on a FsRepository. +#[async_trait::async_trait] +pub trait FsRepositoryOps: Send + Sync { + /// True if this repo is setup to generate local manifest renders. + fn has_renders(&self) -> bool; + + fn iter_rendered_manifests( + &self, + ) -> Pin> + Send + Sync + '_>>; + + fn proxy_path(&self) -> Option<&std::path::Path>; + + /// Remove the identified render from this storage. + async fn remove_rendered_manifest(&self, digest: crate::encoding::Digest) -> Result<()>; + + /// Returns true if the render was actually removed + async fn remove_rendered_manifest_if_older_than( + &self, + older_than: DateTime, + digest: crate::encoding::Digest, + ) -> Result; + + /// Returns a list of the render storage for all the users + /// with renders found in the repository, if any. + /// + /// Returns tuples of (username, `FsRepositoryOps`). + fn renders_for_all_users(&self) -> Result>; +} + +#[async_trait::async_trait] +impl FsRepositoryOps for &T +where + T: FsRepositoryOps, +{ + fn has_renders(&self) -> bool { + T::has_renders(*self) + } + + fn iter_rendered_manifests( + &self, + ) -> Pin> + Send + Sync + '_>> { + T::iter_rendered_manifests(*self) + } + + fn proxy_path(&self) -> Option<&std::path::Path> { + T::proxy_path(*self) + } + + async fn remove_rendered_manifest(&self, digest: crate::encoding::Digest) -> Result<()> { + T::remove_rendered_manifest(*self, digest).await + } + + async fn remove_rendered_manifest_if_older_than( + &self, + older_than: DateTime, + digest: crate::encoding::Digest, + ) -> Result { + T::remove_rendered_manifest_if_older_than(*self, older_than, digest).await + } + + fn renders_for_all_users(&self) -> Result> { + T::renders_for_all_users(*self) + } +} + +/// A pure filesystem-based repository of spfs data. +#[derive(Clone, Debug)] +pub struct FsRepository { + pub(crate) fs_impl: Arc, +} + +#[async_trait::async_trait] +impl FsRepositoryOps for FsRepository +where + FS: FsRepositoryOps, +{ + fn has_renders(&self) -> bool { + self.fs_impl.has_renders() + } + + fn iter_rendered_manifests( + &self, + ) -> Pin> + Send + Sync + '_>> { + self.fs_impl.iter_rendered_manifests() + } + + fn proxy_path(&self) -> Option<&std::path::Path> { + self.fs_impl.proxy_path() + } + + async fn remove_rendered_manifest(&self, digest: crate::encoding::Digest) -> Result<()> { + self.fs_impl.remove_rendered_manifest(digest).await + } + + async fn remove_rendered_manifest_if_older_than( + &self, + older_than: DateTime, + digest: crate::encoding::Digest, + ) -> Result { + self.fs_impl + .remove_rendered_manifest_if_older_than(older_than, digest) + .await + } + + fn renders_for_all_users(&self) -> Result> { + self.fs_impl.renders_for_all_users() + } +} + +impl Address for FsRepository +where + FS: Address, +{ + fn address(&self) -> Cow<'_, url::Url> { + self.fs_impl.address() + } +} + +impl LocalRepository for FsRepository +where + FS: LocalRepository, +{ + fn payloads(&self) -> &FsHashStore { + self.fs_impl.payloads() + } + + fn render_store(&self) -> Result<&RenderStore> { + self.fs_impl.render_store() + } +} + +pub type MaybeOpenFsRepository = FsRepository; +pub type OpenFsRepository = FsRepository; + +impl MaybeOpenFsRepository { + /// Get the opened version of this repository, performing + /// any required opening and validation as needed + pub fn opened(&self) -> impl futures::Future> + 'static { + let fs_impl = Arc::clone(&self.fs_impl); + async move { + let fs_impl = fs_impl + .opened_and_map_err(Error::failed_to_open_repository) + .await?; + Ok(OpenFsRepository { fs_impl }) + } + } + + /// Open a filesystem repository, creating it if necessary + pub async fn create>(root: P) -> OpenRepositoryResult { + MaybeOpenFsRepositoryImpl::create(root) + .await + .map(Into::into) + .map(|fs_impl| FsRepository { fs_impl }) + } +} + +#[async_trait::async_trait] +impl FromConfig for MaybeOpenFsRepository { + type Config = Config; + + async fn from_config(config: Self::Config) -> crate::storage::OpenRepositoryResult { + MaybeOpenFsRepositoryImpl::from_config(config) + .await + .map(Into::into) + .map(|fs_impl| FsRepository { fs_impl }) + } +} + +impl OpenFsRepository { + /// Establish a new filesystem repository + pub async fn create>(root: P) -> OpenRepositoryResult { + OpenFsRepositoryImpl::create(root) + .await + .map(Into::into) + .map(|fs_impl| FsRepository { fs_impl }) + } +} + +impl From for MaybeOpenFsRepository { + fn from(value: OpenFsRepository) -> Self { + MaybeOpenFsRepository { + fs_impl: Arc::new(MaybeOpenFsRepositoryImpl(Arc::new(ArcSwap::new(Arc::new( + InnerFsRepository::Open(value.fs_impl), + ))))), + } + } +} + +impl From> for MaybeOpenFsRepository { + fn from(value: Arc) -> Self { + MaybeOpenFsRepository { + fs_impl: Arc::new(MaybeOpenFsRepositoryImpl(Arc::new(ArcSwap::new(Arc::new( + InnerFsRepository::Open(Arc::clone(&value.fs_impl)), + ))))), + } + } +} + /// A pure filesystem-based repository of spfs data. /// /// This instance can be already validated and open or @@ -122,21 +324,21 @@ impl Clone for RenderStore { /// An [`OpenFsRepository`] is more useful than this one, but /// can also be easily retrieved via the [`Self::opened`]. #[derive(Clone)] -pub struct FsRepository(Arc>); +pub struct MaybeOpenFsRepositoryImpl(Arc>); enum InnerFsRepository { Closed(Config), - Open(Arc), + Open(Arc), } -impl From for FsRepository { - fn from(value: OpenFsRepository) -> Self { +impl From for MaybeOpenFsRepositoryImpl { + fn from(value: OpenFsRepositoryImpl) -> Self { Arc::new(value).into() } } -impl From> for FsRepository { - fn from(value: Arc) -> Self { +impl From> for MaybeOpenFsRepositoryImpl { + fn from(value: Arc) -> Self { Self(Arc::new(ArcSwap::new(Arc::new(InnerFsRepository::Open( value, ))))) @@ -144,7 +346,7 @@ impl From> for FsRepository { } #[async_trait::async_trait] -impl FromConfig for FsRepository { +impl FromConfig for MaybeOpenFsRepositoryImpl { type Config = Config; async fn from_config(config: Self::Config) -> crate::storage::OpenRepositoryResult { @@ -153,16 +355,16 @@ impl FromConfig for FsRepository { InnerFsRepository::Closed(config), ))))) } else { - Ok(OpenFsRepository::from_config(config).await?.into()) + Ok(OpenFsRepositoryImpl::from_config(config).await?.into()) } } } -impl FsRepository { +impl MaybeOpenFsRepositoryImpl { /// Open a filesystem repository, creating it if necessary pub async fn create>(root: P) -> OpenRepositoryResult { Ok(Self(Arc::new(ArcSwap::new(Arc::new( - InnerFsRepository::Open(Arc::new(OpenFsRepository::create(root).await?)), + InnerFsRepository::Open(Arc::new(OpenFsRepositoryImpl::create(root).await?)), ))))) } @@ -171,13 +373,15 @@ impl FsRepository { pub async fn open>(root: P) -> OpenRepositoryResult { let root = root.as_ref(); Ok(Self(Arc::new(ArcSwap::new(Arc::new( - InnerFsRepository::Open(Arc::new(OpenFsRepository::open(&root).await?)), + InnerFsRepository::Open(Arc::new(OpenFsRepositoryImpl::open(&root).await?)), ))))) } /// Get the opened version of this repository, performing /// any required opening and validation as needed - pub fn opened(&self) -> impl futures::Future>> + 'static { + pub fn opened( + &self, + ) -> impl futures::Future>> + 'static { self.opened_and_map_err(Error::failed_to_open_repository) } @@ -185,14 +389,15 @@ impl FsRepository { /// any required opening and validation as needed pub fn try_open( &self, - ) -> impl futures::Future>> + 'static { + ) -> impl futures::Future>> + 'static + { self.opened_and_map_err(|_, e| e) } fn opened_and_map_err( &self, map: F, - ) -> impl futures::Future, E>> + 'static + ) -> impl futures::Future, E>> + 'static where F: FnOnce(&Self, OpenRepositoryError) -> E + 'static, { @@ -201,7 +406,7 @@ impl FsRepository { match &**inner.load() { InnerFsRepository::Closed(config) => { let config = config.clone(); - let opened = match OpenFsRepository::from_config(config).await { + let opened = match OpenFsRepositoryImpl::from_config(config).await { Ok(o) => Arc::new(o), Err(err) => return Err(map(&Self(inner), err)), }; @@ -258,20 +463,20 @@ impl FsRepository { } } -impl Address for FsRepository { +impl Address for MaybeOpenFsRepositoryImpl { fn address(&self) -> Cow<'_, url::Url> { Cow::Owned(url::Url::from_directory_path(self.root()).unwrap()) } } -impl std::fmt::Debug for FsRepository { +impl std::fmt::Debug for MaybeOpenFsRepositoryImpl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("FsRepository @ {:?}", self.root())) } } /// A validated and opened fs repository. -pub struct OpenFsRepository { +pub struct OpenFsRepositoryImpl { root: PathBuf, /// the namespace to use for tag resolution. If set, then this is treated /// as "chroot" of the real tag root. @@ -285,7 +490,7 @@ pub struct OpenFsRepository { } #[async_trait::async_trait] -impl FromConfig for OpenFsRepository { +impl FromConfig for OpenFsRepositoryImpl { type Config = Config; async fn from_config(config: Self::Config) -> crate::storage::OpenRepositoryResult { @@ -301,7 +506,7 @@ impl FromConfig for OpenFsRepository { } } -impl Clone for OpenFsRepository { +impl Clone for OpenFsRepositoryImpl { fn clone(&self) -> Self { let root = self.root.clone(); Self { @@ -314,7 +519,7 @@ impl Clone for OpenFsRepository { } } -impl LocalRepository for OpenFsRepository { +impl LocalRepository for OpenFsRepositoryImpl { #[inline] fn payloads(&self) -> &FsHashStore { &self.payloads @@ -328,7 +533,7 @@ impl LocalRepository for OpenFsRepository { } } -impl OpenFsRepository { +impl OpenFsRepositoryImpl { /// The address of this repository that can be used to re-open it pub fn address(&self) -> url::Url { Config { @@ -343,11 +548,6 @@ impl OpenFsRepository { .expect("repository address is valid") } - /// The filesystem root path of this repository - pub fn root(&self) -> PathBuf { - self.root.clone() - } - /// Establish a new filesystem repository pub async fn create>(root: P) -> OpenRepositoryResult { let root = root.as_ref(); @@ -386,19 +586,27 @@ impl OpenFsRepository { unsafe { Self::open_unchecked(root) } } + pub(crate) fn get_render_storage(&self) -> Result<&crate::storage::fs::FsHashStore> { + match &self.renders { + Some(render_store) => Ok(&render_store.renders), + None => Err(Error::NoRenderStorage(self.address())), + } + } + /// Return the configured tag namespace, if any. #[inline] pub fn get_tag_namespace(&self) -> Option> { self.tag_namespace.as_deref().map(Cow::Borrowed) } - /// Set the configured tag namespace, returning the old tag namespace, - /// if there was one. - pub fn set_tag_namespace( - &mut self, - tag_namespace: Option, - ) -> Option { - std::mem::replace(&mut self.tag_namespace, tag_namespace) + /// The latest repository version that this was migrated to. + pub async fn last_migration(&self) -> MigrationResult { + Ok(read_last_migration_version(self.root()) + .await? + .unwrap_or_else(|| { + semver::Version::parse(crate::VERSION) + .expect("crate::VERSION is a valid semver value") + })) } // Open a repository over the given directory, which must already @@ -454,14 +662,9 @@ impl OpenFsRepository { }) } - /// The latest repository version that this was migrated to. - pub async fn last_migration(&self) -> MigrationResult { - Ok(read_last_migration_version(self.root()) - .await? - .unwrap_or_else(|| { - semver::Version::parse(crate::VERSION) - .expect("crate::VERSION is a valid semver value") - })) + /// The filesystem root path of this repository + pub fn root(&self) -> PathBuf { + self.root.clone() } /// Sets the latest version of this repository. @@ -471,16 +674,97 @@ impl OpenFsRepository { set_last_migration(self.root(), Some(version)).await } + /// Set the configured tag namespace, returning the old tag namespace, + /// if there was one. + pub fn set_tag_namespace( + &mut self, + tag_namespace: Option, + ) -> Option { + std::mem::replace(&mut self.tag_namespace, tag_namespace) + } +} + +#[async_trait::async_trait] +impl FsRepositoryOps for OpenFsRepositoryImpl { /// True if this repo is setup to generate local manifest renders. - pub fn has_renders(&self) -> bool { + fn has_renders(&self) -> bool { self.renders.is_some() } + fn iter_rendered_manifests( + &self, + ) -> Pin> + Send + Sync + '_>> { + Box::pin(try_stream! { + let renders = self.get_render_storage()?; + for await digest in renders.iter() { + yield digest?; + } + }) + } + + fn proxy_path(&self) -> Option<&std::path::Path> { + self.renders + .as_ref() + .map(|render_store| render_store.proxy.root()) + } + + async fn remove_rendered_manifest(&self, digest: crate::encoding::Digest) -> Result<()> { + let renders = match &self.renders { + Some(render_store) => &render_store.renders, + None => return Ok(()), + }; + let rendered_dirpath = renders.build_digest_path(&digest); + let workdir = renders.workdir(); + makedirs_with_perms(&workdir, renders.directory_permissions).map_err(|source| { + Error::StorageWriteError("remove render create workdir", workdir.clone(), source) + })?; + OpenFsRepository::remove_dir_atomically(&rendered_dirpath, &workdir).await + } + + async fn remove_rendered_manifest_if_older_than( + &self, + older_than: DateTime, + digest: crate::encoding::Digest, + ) -> Result { + let renders = match &self.renders { + Some(render_store) => &render_store.renders, + None => return Ok(false), + }; + let rendered_dirpath = renders.build_digest_path(&digest); + + let metadata = match tokio::fs::symlink_metadata(&rendered_dirpath).await { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false), + Err(err) => { + return Err(Error::StorageReadError( + "symlink_metadata on rendered dir path", + rendered_dirpath.clone(), + err, + )) + } + Ok(metadata) => metadata, + }; + + let mtime = metadata.modified().map_err(|err| { + Error::StorageReadError( + "modified on symlink metadata of rendered dir path", + rendered_dirpath.clone(), + err, + ) + })?; + + if DateTime::::from(mtime) >= older_than { + return Ok(false); + } + + self.remove_rendered_manifest(digest).await?; + Ok(true) + } + /// Returns a list of the render storage for all the users /// with renders found in the repository, if any. /// /// Returns tuples of (username, `ManifestViewer`). - pub fn renders_for_all_users(&self) -> Result> { + fn renders_for_all_users(&self) -> Result> { if !self.has_renders() { return Ok(Vec::new()); } @@ -530,15 +814,15 @@ impl OpenFsRepository { } } -impl Address for OpenFsRepository { +impl Address for OpenFsRepositoryImpl { fn address(&self) -> Cow<'_, url::Url> { Cow::Owned(url::Url::from_directory_path(self.root()).unwrap()) } } -impl std::fmt::Debug for OpenFsRepository { +impl std::fmt::Debug for OpenFsRepositoryImpl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("OpenFsRepository @ {:?}", self.root())) + f.write_fmt(format_args!("OpenFsRepositoryImpl @ {:?}", self.root())) } } diff --git a/crates/spfs/src/storage/fs/tag.rs b/crates/spfs/src/storage/fs/tag.rs index 96cc400641..d691b7f2c5 100644 --- a/crates/spfs/src/storage/fs/tag.rs +++ b/crates/spfs/src/storage/fs/tag.rs @@ -10,6 +10,7 @@ use std::mem::size_of; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::pin::Pin; +use std::sync::Arc; use std::task::Poll; use close_err::Closable; @@ -19,7 +20,7 @@ use futures::{Future, Stream, StreamExt, TryFutureExt}; use relative_path::RelativePath; use tokio::io::{AsyncRead, AsyncSeek, AsyncWriteExt, ReadBuf}; -use super::{FsRepository, OpenFsRepository}; +use super::{MaybeOpenFsRepository, OpenFsRepository}; use crate::storage::tag::{EntryType, TagSpecAndTagStream, TagStream}; use crate::storage::{ TagNamespace, @@ -33,10 +34,10 @@ use crate::{encoding, tracking, Error, OsError, OsErrorExt, Result}; const TAG_EXT: &str = "tag"; #[async_trait::async_trait] -impl TagStorage for FsRepository { +impl TagStorage for MaybeOpenFsRepository { #[inline] fn get_tag_namespace(&self) -> Option> { - Self::get_tag_namespace(self) + self.fs_impl.get_tag_namespace() } fn ls_tags_in_namespace( @@ -130,7 +131,7 @@ impl TagStorage for FsRepository { } } -impl FsRepository { +impl MaybeOpenFsRepository { /// Forcefully remove any lock file for the identified tag. /// /// # Safety @@ -155,7 +156,7 @@ impl FsRepository { impl OpenFsRepository { fn tags_root_in_namespace(&self, namespace: Option<&TagNamespace>) -> PathBuf { - let mut tags_root = self.root().join("tags"); + let mut tags_root = self.fs_impl.root().join("tags"); if let Some(tag_namespace) = namespace { for component in tag_namespace.as_rel_path().components() { // Assuming the tag namespace is only made up of `Normal` @@ -194,7 +195,7 @@ impl OpenFsRepository { impl TagStorage for OpenFsRepository { #[inline] fn get_tag_namespace(&self) -> Option> { - Self::get_tag_namespace(self) + self.fs_impl.get_tag_namespace() } fn ls_tags_in_namespace( @@ -447,12 +448,12 @@ impl TagStorage for OpenFsRepository { } } -impl TagStorageMut for FsRepository { +impl TagStorageMut for MaybeOpenFsRepository { fn try_set_tag_namespace( &mut self, tag_namespace: Option, ) -> Result> { - Ok(Self::set_tag_namespace(self, tag_namespace)) + Ok(Arc::make_mut(&mut self.fs_impl).set_tag_namespace(tag_namespace)) } } diff --git a/crates/spfs/src/storage/handle.rs b/crates/spfs/src/storage/handle.rs index 597ba0b4ac..dc701bd504 100644 --- a/crates/spfs/src/storage/handle.rs +++ b/crates/spfs/src/storage/handle.rs @@ -21,7 +21,7 @@ use crate::{graph, Error, Result}; #[derive(Debug)] #[allow(clippy::large_enum_variant)] pub enum RepositoryHandle { - FS(super::fs::FsRepository), + FS(super::fs::MaybeOpenFsRepository), Tar(super::tar::TarRepository), Rpc(super::rpc::RpcRepository), FallbackProxy(Box), @@ -80,8 +80,8 @@ impl RepositoryHandle { } } -impl From for RepositoryHandle { - fn from(repo: super::fs::FsRepository) -> Self { +impl From for RepositoryHandle { + fn from(repo: super::fs::MaybeOpenFsRepository) -> Self { RepositoryHandle::FS(repo) } } diff --git a/crates/spfs/src/storage/proxy/repository_test.rs b/crates/spfs/src/storage/proxy/repository_test.rs index c37ece72d9..d2e3479fcd 100644 --- a/crates/spfs/src/storage/proxy/repository_test.rs +++ b/crates/spfs/src/storage/proxy/repository_test.rs @@ -12,12 +12,13 @@ use crate::prelude::*; async fn test_proxy_payload_read_through(tmpdir: tempfile::TempDir) { init_logging(); - let primary = crate::storage::fs::FsRepository::create(tmpdir.path().join("primary")) - .await - .unwrap(); - let secondary = crate::storage::fs::FsRepository::create(tmpdir.path().join("secondary")) + let primary = crate::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("primary")) .await .unwrap(); + let secondary = + crate::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("secondary")) + .await + .unwrap(); let digest = secondary .commit_blob(Box::pin(b"some data".as_slice())) @@ -40,12 +41,13 @@ async fn test_proxy_payload_read_through(tmpdir: tempfile::TempDir) { async fn test_proxy_object_read_through(tmpdir: tempfile::TempDir) { init_logging(); - let primary = crate::storage::fs::FsRepository::create(tmpdir.path().join("primary")) - .await - .unwrap(); - let secondary = crate::storage::fs::FsRepository::create(tmpdir.path().join("secondary")) + let primary = crate::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("primary")) .await .unwrap(); + let secondary = + crate::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("secondary")) + .await + .unwrap(); let payload = secondary .commit_blob(Box::pin(b"some data".as_slice())) @@ -68,12 +70,13 @@ async fn test_proxy_object_read_through(tmpdir: tempfile::TempDir) { async fn test_proxy_tag_read_through(tmpdir: tempfile::TempDir) { init_logging(); - let primary = crate::storage::fs::FsRepository::create(tmpdir.path().join("primary")) - .await - .unwrap(); - let secondary = crate::storage::fs::FsRepository::create(tmpdir.path().join("secondary")) + let primary = crate::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("primary")) .await .unwrap(); + let secondary = + crate::storage::fs::MaybeOpenFsRepository::create(tmpdir.path().join("secondary")) + .await + .unwrap(); let payload = secondary .commit_blob(Box::pin(b"some data".as_slice())) diff --git a/crates/spfs/src/storage/repository_test.rs b/crates/spfs/src/storage/repository_test.rs index 2a0c9add51..9a78ec5ef2 100644 --- a/crates/spfs/src/storage/repository_test.rs +++ b/crates/spfs/src/storage/repository_test.rs @@ -68,7 +68,7 @@ async fn test_commit_mode_fs(tmpdir: tempfile::TempDir) { init_logging(); let dir = tmpdir.path(); let tmprepo = Arc::new( - fs::FsRepository::create(dir.join("repo")) + fs::MaybeOpenFsRepository::create(dir.join("repo")) .await .unwrap() .into(), @@ -94,7 +94,7 @@ async fn test_commit_mode_fs(tmpdir: tempfile::TempDir) { _ => panic!("Unexpected tmprepo type!"), }; - let rendered_dir = fs::Renderer::new(&*tmprepo) + let rendered_dir = fs::Renderer::new(&tmprepo) .render_manifest(&manifest.to_graph_manifest(), None) .await .expect("failed to render manifest"); @@ -105,7 +105,10 @@ async fn test_commit_mode_fs(tmpdir: tempfile::TempDir) { let symlink_entry = manifest .get_path(symlink_path) .expect("symlink not in manifest"); - let symlink_blob = tmprepo.payloads.build_digest_path(&symlink_entry.object); + let symlink_blob = tmprepo + .fs_impl + .payloads + .build_digest_path(&symlink_entry.object); let blob_mode = symlink_blob.symlink_metadata().unwrap().mode(); assert!( !unix_mode::is_symlink(blob_mode), diff --git a/crates/spfs/src/storage/tag_test.rs b/crates/spfs/src/storage/tag_test.rs index a47f36c3cf..f5156fd372 100644 --- a/crates/spfs/src/storage/tag_test.rs +++ b/crates/spfs/src/storage/tag_test.rs @@ -13,7 +13,7 @@ use tokio_stream::StreamExt; use crate::fixtures::*; #[cfg(unix)] -use crate::storage::fs::FsRepository; +use crate::storage::fs::MaybeOpenFsRepository; use crate::storage::{EntryType, TagStorage}; use crate::{encoding, tracking, Result}; @@ -113,7 +113,7 @@ async fn test_tag_no_duplication( #[rstest] #[tokio::test] async fn test_tag_permissions(tmpdir: tempfile::TempDir) { - let storage = FsRepository::create(tmpdir.path().join("repo")) + let storage = MaybeOpenFsRepository::create(tmpdir.path().join("repo")) .await .unwrap(); let spec = tracking::TagSpec::parse("hello").unwrap(); diff --git a/crates/spfs/src/storage/tar/repository.rs b/crates/spfs/src/storage/tar/repository.rs index 7a9233b1ac..a96b76c3c9 100644 --- a/crates/spfs/src/storage/tar/repository.rs +++ b/crates/spfs/src/storage/tar/repository.rs @@ -68,7 +68,7 @@ pub struct TarRepository { up_to_date: AtomicBool, archive: std::path::PathBuf, repo_dir: tempfile::TempDir, - repo: crate::storage::fs::FsRepository, + repo: crate::storage::fs::MaybeOpenFsRepository, } #[async_trait::async_trait] @@ -158,7 +158,7 @@ impl TarRepository { up_to_date: AtomicBool::new(false), archive: path, repo_dir: tmpdir, - repo: crate::storage::fs::FsRepository::create(&repo_path).await?, + repo: crate::storage::fs::MaybeOpenFsRepository::create(&repo_path).await?, }) } diff --git a/crates/spfs/src/sync_test.rs b/crates/spfs/src/sync_test.rs index 4f618a9257..fd8813c752 100644 --- a/crates/spfs/src/sync_test.rs +++ b/crates/spfs/src/sync_test.rs @@ -274,11 +274,11 @@ async fn test_sync_through_tar( #[fixture] async fn config(tmpdir: tempfile::TempDir) -> (tempfile::TempDir, Config) { let repo_path = tmpdir.path().join("repo"); - crate::storage::fs::FsRepository::create(&repo_path) + crate::storage::fs::MaybeOpenFsRepository::create(&repo_path) .await .expect("failed to make repo for test"); let origin_path = tmpdir.path().join("origin"); - crate::storage::fs::FsRepository::create(&origin_path) + crate::storage::fs::MaybeOpenFsRepository::create(&origin_path) .await .expect("failed to make repo for test"); let mut conf = Config::default(); diff --git a/crates/spk-storage/src/fixtures.rs b/crates/spk-storage/src/fixtures.rs index 401ad7437b..fdb6edf3c0 100644 --- a/crates/spk-storage/src/fixtures.rs +++ b/crates/spk-storage/src/fixtures.rs @@ -120,7 +120,7 @@ where let repo = match kind { RepoKind::Spfs => { let storage_root = tmpdir.path().join("repo"); - let spfs_repo = spfs::storage::fs::FsRepository::create(&storage_root) + let spfs_repo = spfs::storage::fs::MaybeOpenFsRepository::create(&storage_root) .await .expect("failed to establish temporary local repo for test"); let written = spfs_repo diff --git a/crates/spk-storage/src/storage/spfs_test.rs b/crates/spk-storage/src/storage/spfs_test.rs index bd85e9b10d..8b417be625 100644 --- a/crates/spk-storage/src/storage/spfs_test.rs +++ b/crates/spk-storage/src/storage/spfs_test.rs @@ -39,7 +39,7 @@ async fn test_metadata_io(tmpdir: tempfile::TempDir) { NormalizedTagStrategy, >::new( "test-repo", - spfs::storage::fs::FsRepository::create(repo_root) + spfs::storage::fs::MaybeOpenFsRepository::create(repo_root) .await .unwrap(), )) @@ -63,7 +63,7 @@ async fn test_upgrade_sets_version(tmpdir: tempfile::TempDir) { NormalizedTagStrategy, >::new( "test-repo", - spfs::storage::fs::FsRepository::create(repo_root) + spfs::storage::fs::MaybeOpenFsRepository::create(repo_root) .await .unwrap(), )) @@ -84,7 +84,7 @@ async fn test_upgrade_sets_version(tmpdir: tempfile::TempDir) { async fn test_upgrade_changes_tags(tmpdir: tempfile::TempDir) { init_logging(); let repo_root = tmpdir.path(); - let spfs_repo = spfs::storage::fs::FsRepository::create(repo_root) + let spfs_repo = spfs::storage::fs::MaybeOpenFsRepository::create(repo_root) .await .unwrap(); let repo = SpfsRepository::::new(