diff --git a/clients/filesystem-fuse/Cargo.toml b/clients/filesystem-fuse/Cargo.toml index 3bcf20f37ef..75a4dd71301 100644 --- a/clients/filesystem-fuse/Cargo.toml +++ b/clients/filesystem-fuse/Cargo.toml @@ -40,6 +40,6 @@ fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] } futures-util = "0.3.30" libc = "0.2.168" log = "0.4.22" +once_cell = "1.20.2" tokio = { version = "1.38.0", features = ["full"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } - diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs b/clients/filesystem-fuse/src/default_raw_filesystem.rs index 9a66cd551f0..0ab92e91640 100644 --- a/clients/filesystem-fuse/src/default_raw_filesystem.rs +++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs @@ -16,14 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -use crate::filesystem::{FileStat, PathFileSystem, RawFileSystem, Result}; +use crate::filesystem::{ + FileStat, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, ROOT_DIR_FILE_ID, + ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH, +}; use crate::opened_file::{FileHandle, OpenFileFlags}; use crate::opened_file_manager::OpenedFileManager; -use crate::utils::join_file_path; use async_trait::async_trait; use bytes::Bytes; use fuse3::{Errno, FileType}; use std::collections::HashMap; +use std::ffi::OsStr; +use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicU64; use tokio::sync::RwLock; @@ -43,16 +47,11 @@ pub struct DefaultRawFileSystem { } impl DefaultRawFileSystem { - const INITIAL_FILE_ID: u64 = 10000; - const ROOT_DIR_PARENT_FILE_ID: u64 = 1; - const ROOT_DIR_FILE_ID: u64 = 1; - const ROOT_DIR_NAME: &'static str = ""; - pub(crate) fn new(fs: T) -> Self { Self { file_entry_manager: RwLock::new(FileEntryManager::new()), opened_file_manager: OpenedFileManager::new(), - file_id_generator: AtomicU64::new(Self::INITIAL_FILE_ID), + file_id_generator: AtomicU64::new(INITIAL_FILE_ID), fs, } } @@ -70,7 +69,7 @@ impl DefaultRawFileSystem { .ok_or(Errno::from(libc::ENOENT)) } - async fn get_file_entry_by_path(&self, path: &str) -> Option { + async fn get_file_entry_by_path(&self, path: &Path) -> Option { self.file_entry_manager .read() .await @@ -123,12 +122,12 @@ impl DefaultRawFileSystem { Ok(file.file_handle()) } - async fn remove_file_entry_locked(&self, path: &str) { + async fn remove_file_entry_locked(&self, path: &Path) { let mut file_manager = self.file_entry_manager.write().await; file_manager.remove(path); } - async fn insert_file_entry_locked(&self, parent_file_id: u64, file_id: u64, path: &str) { + async fn insert_file_entry_locked(&self, parent_file_id: u64, file_id: u64, path: &Path) { let mut file_manager = self.file_entry_manager.write().await; file_manager.insert(parent_file_id, file_id, path); } @@ -139,9 +138,9 @@ impl RawFileSystem for DefaultRawFileSystem { async fn init(&self) -> Result<()> { // init root directory self.insert_file_entry_locked( - Self::ROOT_DIR_PARENT_FILE_ID, - Self::ROOT_DIR_FILE_ID, - Self::ROOT_DIR_NAME, + ROOT_DIR_PARENT_FILE_ID, + ROOT_DIR_FILE_ID, + Path::new(ROOT_DIR_PATH), ) .await; self.fs.init().await @@ -149,7 +148,7 @@ impl RawFileSystem for DefaultRawFileSystem { async fn get_file_path(&self, file_id: u64) -> Result { let file_entry = self.get_file_entry(file_id).await?; - Ok(file_entry.path) + Ok(file_entry.path.to_string_lossy().to_string()) } async fn valid_file_handle_id(&self, file_id: u64, fh: u64) -> Result<()> { @@ -174,12 +173,15 @@ impl RawFileSystem for DefaultRawFileSystem { Ok(file_stat) } - async fn lookup(&self, parent_file_id: u64, name: &str) -> Result { + async fn lookup(&self, parent_file_id: u64, name: &OsStr) -> Result { let parent_file_entry = self.get_file_entry(parent_file_id).await?; - let mut file_stat = self.fs.lookup(&parent_file_entry.path, name).await?; + + let path = parent_file_entry.path.join(name); + let mut file_stat = self.fs.stat(&path).await?; // fill the file id to file stat self.resolve_file_id_to_filestat(&mut file_stat, parent_file_id) .await; + Ok(file_stat) } @@ -203,11 +205,16 @@ impl RawFileSystem for DefaultRawFileSystem { .await } - async fn create_file(&self, parent_file_id: u64, name: &str, flags: u32) -> Result { + async fn create_file( + &self, + parent_file_id: u64, + name: &OsStr, + flags: u32, + ) -> Result { let parent_file_entry = self.get_file_entry(parent_file_id).await?; let mut file_without_id = self .fs - .create_file(&parent_file_entry.path, name, OpenFileFlags(flags)) + .create_file(&parent_file_entry.path.join(name), OpenFileFlags(flags)) .await?; file_without_id.set_file_id(parent_file_id, self.next_file_id()); @@ -226,9 +233,10 @@ impl RawFileSystem for DefaultRawFileSystem { Ok(opened_file_with_file_handle_id.file_handle()) } - async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result { + async fn create_dir(&self, parent_file_id: u64, name: &OsStr) -> Result { let parent_file_entry = self.get_file_entry(parent_file_id).await?; - let mut filestat = self.fs.create_dir(&parent_file_entry.path, name).await?; + let path = parent_file_entry.path.join(name); + let mut filestat = self.fs.create_dir(&path).await?; filestat.set_file_id(parent_file_id, self.next_file_id()); @@ -243,23 +251,23 @@ impl RawFileSystem for DefaultRawFileSystem { self.fs.set_attr(&file_entry.path, file_stat, true).await } - async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()> { + async fn remove_file(&self, parent_file_id: u64, name: &OsStr) -> Result<()> { let parent_file_entry = self.get_file_entry(parent_file_id).await?; - self.fs.remove_file(&parent_file_entry.path, name).await?; + let path = parent_file_entry.path.join(name); + self.fs.remove_file(&path).await?; // remove the file from file entry manager - self.remove_file_entry_locked(&join_file_path(&parent_file_entry.path, name)) - .await; + self.remove_file_entry_locked(&path).await; Ok(()) } - async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()> { + async fn remove_dir(&self, parent_file_id: u64, name: &OsStr) -> Result<()> { let parent_file_entry = self.get_file_entry(parent_file_id).await?; - self.fs.remove_dir(&parent_file_entry.path, name).await?; + let path = parent_file_entry.path.join(name); + self.fs.remove_dir(&path).await?; // remove the dir from file entry manager - self.remove_file_entry_locked(&join_file_path(&parent_file_entry.path, name)) - .await; + self.remove_file_entry_locked(&path).await; Ok(()) } @@ -324,7 +332,7 @@ impl RawFileSystem for DefaultRawFileSystem { struct FileEntry { file_id: u64, parent_file_id: u64, - path: String, + path: PathBuf, } /// FileEntryManager is manage all the file entries in memory. it is used manger the file relationship and name mapping. @@ -333,7 +341,7 @@ struct FileEntryManager { file_id_map: HashMap, // file_path_map is a map of file path to file entry. - file_path_map: HashMap, + file_path_map: HashMap, } impl FileEntryManager { @@ -348,21 +356,21 @@ impl FileEntryManager { self.file_id_map.get(&file_id).cloned() } - fn get_file_entry_by_path(&self, path: &str) -> Option { + fn get_file_entry_by_path(&self, path: &Path) -> Option { self.file_path_map.get(path).cloned() } - fn insert(&mut self, parent_file_id: u64, file_id: u64, path: &str) { + fn insert(&mut self, parent_file_id: u64, file_id: u64, path: &Path) { let file_entry = FileEntry { file_id, parent_file_id, - path: path.to_string(), + path: path.into(), }; self.file_id_map.insert(file_id, file_entry.clone()); - self.file_path_map.insert(path.to_string(), file_entry); + self.file_path_map.insert(path.into(), file_entry); } - fn remove(&mut self, path: &str) { + fn remove(&mut self, path: &Path) { if let Some(file) = self.file_path_map.remove(path) { self.file_id_map.remove(&file.file_id); } @@ -372,23 +380,34 @@ impl FileEntryManager { #[cfg(test)] mod tests { use super::*; + use crate::filesystem::tests::TestRawFileSystem; + use crate::memory_filesystem::MemoryFileSystem; #[test] fn test_file_entry_manager() { let mut manager = FileEntryManager::new(); - manager.insert(1, 2, "a/b"); + manager.insert(1, 2, Path::new("a/b")); let file = manager.get_file_entry_by_id(2).unwrap(); assert_eq!(file.file_id, 2); assert_eq!(file.parent_file_id, 1); - assert_eq!(file.path, "a/b"); + assert_eq!(file.path, Path::new("a/b")); - let file = manager.get_file_entry_by_path("a/b").unwrap(); + let file = manager.get_file_entry_by_path(Path::new("a/b")).unwrap(); assert_eq!(file.file_id, 2); assert_eq!(file.parent_file_id, 1); - assert_eq!(file.path, "a/b"); + assert_eq!(file.path, Path::new("a/b")); - manager.remove("a/b"); + manager.remove(Path::new("a/b")); assert!(manager.get_file_entry_by_id(2).is_none()); - assert!(manager.get_file_entry_by_path("a/b").is_none()); + assert!(manager.get_file_entry_by_path(Path::new("a/b")).is_none()); + } + + #[tokio::test] + async fn test_default_raw_file_system() { + let memory_fs = MemoryFileSystem::new().await; + let raw_fs = DefaultRawFileSystem::new(memory_fs); + let _ = raw_fs.init().await; + let mut tester = TestRawFileSystem::new(raw_fs); + tester.test_raw_file_system().await; } } diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs index b0d32ded233..d9440b0e652 100644 --- a/clients/filesystem-fuse/src/filesystem.rs +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -17,14 +17,22 @@ * under the License. */ use crate::opened_file::{FileHandle, OpenFileFlags, OpenedFile}; -use crate::utils::{join_file_path, split_file_path}; use async_trait::async_trait; use bytes::Bytes; +use fuse3::FileType::{Directory, RegularFile}; use fuse3::{Errno, FileType, Timestamp}; +use std::ffi::{OsStr, OsString}; +use std::path::{Path, PathBuf}; use std::time::SystemTime; pub(crate) type Result = std::result::Result; +pub(crate) const ROOT_DIR_PARENT_FILE_ID: u64 = 1; +pub(crate) const ROOT_DIR_FILE_ID: u64 = 1; +pub(crate) const ROOT_DIR_NAME: &str = ""; +pub(crate) const ROOT_DIR_PATH: &str = "/"; +pub(crate) const INITIAL_FILE_ID: u64 = 10000; + /// RawFileSystem interface for the file system implementation. it use by FuseApiHandle, /// it ues the file id to operate the file system apis /// the `file_id` and `parent_file_id` it is the unique identifier for the file system, @@ -47,7 +55,7 @@ pub(crate) trait RawFileSystem: Send + Sync { async fn stat(&self, file_id: u64) -> Result; /// Lookup the file by parent file id and file name, if the file exists, return the file stat - async fn lookup(&self, parent_file_id: u64, name: &str) -> Result; + async fn lookup(&self, parent_file_id: u64, name: &OsStr) -> Result; /// Read the directory by file id, if the file id is a valid directory, return the file stat list async fn read_dir(&self, dir_file_id: u64) -> Result>; @@ -59,19 +67,24 @@ pub(crate) trait RawFileSystem: Send + Sync { async fn open_dir(&self, file_id: u64, flags: u32) -> Result; /// Create the file by parent file id and file name and flags, if successful, return the file handle - async fn create_file(&self, parent_file_id: u64, name: &str, flags: u32) -> Result; + async fn create_file( + &self, + parent_file_id: u64, + name: &OsStr, + flags: u32, + ) -> Result; /// Create the directory by parent file id and file name, if successful, return the file id - async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result; + async fn create_dir(&self, parent_file_id: u64, name: &OsStr) -> Result; /// Set the file attribute by file id and file stat async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()>; /// Remove the file by parent file id and file name - async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()>; + async fn remove_file(&self, parent_file_id: u64, name: &OsStr) -> Result<()>; /// Remove the directory by parent file id and file name - async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()>; + async fn remove_dir(&self, parent_file_id: u64, name: &OsStr) -> Result<()>; /// Close the file by file id and file handle, if successful async fn close_file(&self, file_id: u64, fh: u64) -> Result<()>; @@ -91,39 +104,31 @@ pub(crate) trait PathFileSystem: Send + Sync { async fn init(&self) -> Result<()>; /// Get the file stat by file path, if the file exists, return the file stat - async fn stat(&self, path: &str) -> Result; - - /// Get the file stat by parent file path and file name, if the file exists, return the file stat - async fn lookup(&self, parent: &str, name: &str) -> Result; + async fn stat(&self, path: &Path) -> Result; /// Read the directory by file path, if the directory exists, return the file stat list - async fn read_dir(&self, path: &str) -> Result>; + async fn read_dir(&self, path: &Path) -> Result>; /// Open the file by file path and flags, if the file exists, return the opened file - async fn open_file(&self, path: &str, flags: OpenFileFlags) -> Result; + async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result; /// Open the directory by file path and flags, if the file exists, return the opened file - async fn open_dir(&self, path: &str, flags: OpenFileFlags) -> Result; + async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> Result; - /// Create the file by parent file path and file name and flags, if successful return the opened file - async fn create_file( - &self, - parent: &str, - name: &str, - flags: OpenFileFlags, - ) -> Result; + /// Create the file by file path and flags, if successful, return the opened file + async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result; - /// Create the directory by parent file path and file name, if successful, return the file stat - async fn create_dir(&self, parent: &str, name: &str) -> Result; + /// Create the directory by file path , if successful, return the file stat + async fn create_dir(&self, path: &Path) -> Result; /// Set the file attribute by file path and file stat - async fn set_attr(&self, path: &str, file_stat: &FileStat, flush: bool) -> Result<()>; + async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()>; - /// Remove the file by parent file path and file name - async fn remove_file(&self, parent: &str, name: &str) -> Result<()>; + /// Remove the file by file path + async fn remove_file(&self, path: &Path) -> Result<()>; - /// Remove the directory by parent file path and file name - async fn remove_dir(&self, parent: &str, name: &str) -> Result<()>; + /// Remove the directory by file path + async fn remove_dir(&self, path: &Path) -> Result<()>; } // FileSystemContext is the system environment for the fuse file system. @@ -166,10 +171,10 @@ pub struct FileStat { pub(crate) parent_file_id: u64, // file name - pub(crate) name: String, + pub(crate) name: OsString, // file path of the fuse file system root - pub(crate) path: String, + pub(crate) path: PathBuf, // file size pub(crate) size: u64, @@ -191,31 +196,33 @@ pub struct FileStat { } impl FileStat { - pub fn new_file_filestat_with_path(path: &str, size: u64) -> Self { - let (parent, name) = split_file_path(path); - Self::new_file_filestat(parent, name, size) + pub fn new_file_filestat_with_path(path: &Path, size: u64) -> Self { + Self::new_filestat(path, size, RegularFile) } - pub fn new_dir_filestat_with_path(path: &str) -> Self { - let (parent, name) = split_file_path(path); - Self::new_dir_filestat(parent, name) + pub fn new_dir_filestat_with_path(path: &Path) -> Self { + Self::new_filestat(path, 0, Directory) } - pub fn new_file_filestat(parent: &str, name: &str, size: u64) -> Self { - Self::new_filestat(parent, name, size, FileType::RegularFile) + pub fn new_file_filestat(parent: &Path, name: &OsStr, size: u64) -> Self { + let path = parent.join(name); + Self::new_filestat(&path, size, RegularFile) } - pub fn new_dir_filestat(parent: &str, name: &str) -> Self { - Self::new_filestat(parent, name, 0, FileType::Directory) + pub fn new_dir_filestat(parent: &Path, name: &OsStr) -> Self { + let path = parent.join(name); + Self::new_filestat(&path, 0, Directory) } - pub fn new_filestat(parent: &str, name: &str, size: u64, kind: FileType) -> Self { + pub fn new_filestat(path: &Path, size: u64, kind: FileType) -> Self { let atime = Timestamp::from(SystemTime::now()); + // root directory name is "" + let name = path.file_name().unwrap_or(OsStr::new(ROOT_DIR_NAME)); Self { file_id: 0, parent_file_id: 0, - name: name.into(), - path: join_file_path(parent, name), + name: name.to_os_string(), + path: path.into(), size: size, kind: kind, atime: atime, @@ -262,43 +269,414 @@ pub trait FileWriter: Sync + Send { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; + use std::collections::HashMap; + + pub(crate) struct TestPathFileSystem { + files: HashMap, + fs: F, + } + + impl TestPathFileSystem { + pub(crate) fn new(fs: F) -> Self { + Self { + files: HashMap::new(), + fs, + } + } + + pub(crate) async fn test_path_file_system(&mut self) { + // Test root dir + self.test_root_dir().await; + + // Test stat file + self.test_stat_file(Path::new("/.gvfs_meta"), RegularFile, 0) + .await; + + // Test create file + self.test_create_file(Path::new("/file1.txt")).await; + + // Test create dir + self.test_create_dir(Path::new("/dir1")).await; + + // Test list dir + self.test_list_dir(Path::new("/")).await; + + // Test remove file + self.test_remove_file(Path::new("/file1.txt")).await; + + // Test remove dir + self.test_remove_dir(Path::new("/dir1")).await; + + // Test file not found + self.test_file_not_found(Path::new("unknown")).await; + + // Test list dir + self.test_list_dir(Path::new("/")).await; + } + + async fn test_root_dir(&mut self) { + let root_dir_path = Path::new("/"); + let root_file_stat = self.fs.stat(root_dir_path).await; + assert!(root_file_stat.is_ok()); + let root_file_stat = root_file_stat.unwrap(); + self.assert_file_stat(&root_file_stat, root_dir_path, Directory, 0); + } + + async fn test_stat_file(&mut self, path: &Path, expect_kind: FileType, expect_size: u64) { + let file_stat = self.fs.stat(path).await; + assert!(file_stat.is_ok()); + let file_stat = file_stat.unwrap(); + self.assert_file_stat(&file_stat, path, expect_kind, expect_size); + self.files.insert(file_stat.path.clone(), file_stat); + } + + async fn test_create_file(&mut self, path: &Path) { + let opened_file = self.fs.create_file(path, OpenFileFlags(0)).await; + assert!(opened_file.is_ok()); + let file = opened_file.unwrap(); + self.assert_file_stat(&file.file_stat, path, FileType::RegularFile, 0); + self.test_stat_file(path, RegularFile, 0).await; + } + + async fn test_create_dir(&mut self, path: &Path) { + let dir_stat = self.fs.create_dir(path).await; + assert!(dir_stat.is_ok()); + let dir_stat = dir_stat.unwrap(); + self.assert_file_stat(&dir_stat, path, Directory, 0); + self.test_stat_file(path, Directory, 0).await; + } + + async fn test_list_dir(&self, path: &Path) { + let list_dir = self.fs.read_dir(path).await; + assert!(list_dir.is_ok()); + let list_dir = list_dir.unwrap(); + assert_eq!(list_dir.len(), self.files.len()); + for file_stat in list_dir { + assert!(self.files.contains_key(&file_stat.path)); + let actual_file_stat = self.files.get(&file_stat.path).unwrap(); + self.assert_file_stat( + &file_stat, + &actual_file_stat.path, + actual_file_stat.kind, + actual_file_stat.size, + ); + } + } + + async fn test_remove_file(&mut self, path: &Path) { + let remove_file = self.fs.remove_file(path).await; + assert!(remove_file.is_ok()); + self.files.remove(path); + + self.test_file_not_found(path).await; + } + + async fn test_remove_dir(&mut self, path: &Path) { + let remove_dir = self.fs.remove_dir(path).await; + assert!(remove_dir.is_ok()); + self.files.remove(path); + + self.test_file_not_found(path).await; + } + + async fn test_file_not_found(&self, path: &Path) { + let not_found_file = self.fs.stat(path).await; + assert!(not_found_file.is_err()); + } + + fn assert_file_stat(&self, file_stat: &FileStat, path: &Path, kind: FileType, size: u64) { + assert_eq!(file_stat.path, path); + assert_eq!(file_stat.kind, kind); + assert_eq!(file_stat.size, size); + } + } + + pub(crate) struct TestRawFileSystem { + fs: F, + files: HashMap, + } + + impl TestRawFileSystem { + pub(crate) fn new(fs: F) -> Self { + Self { + fs, + files: HashMap::new(), + } + } + + pub(crate) async fn test_raw_file_system(&mut self) { + // Test root dir + self.test_root_dir().await; + + let parent_file_id = ROOT_DIR_FILE_ID; + // Test lookup file + let file_id = self + .test_lookup_file(parent_file_id, ".gvfs_meta".as_ref(), RegularFile, 0) + .await; + + // Test get file stat + self.test_stat_file(file_id, Path::new("/.gvfs_meta"), RegularFile, 0) + .await; + + // Test get file path + self.test_get_file_path(file_id, "/.gvfs_meta").await; + + // Test create file + self.test_create_file(parent_file_id, "file1.txt".as_ref()) + .await; + + // Test open file + let file_handle = self + .test_open_file(parent_file_id, "file1.txt".as_ref()) + .await; + + // Test write file + self.test_write_file(&file_handle, "test").await; + + // Test read file + self.test_read_file(&file_handle, "test").await; + + // Test close file + self.test_close_file(&file_handle).await; + + // Test create dir + self.test_create_dir(parent_file_id, "dir1".as_ref()).await; + + // Test list dir + self.test_list_dir(parent_file_id).await; + + // Test remove file + self.test_remove_file(parent_file_id, "file1.txt".as_ref()) + .await; + + // Test remove dir + self.test_remove_dir(parent_file_id, "dir1".as_ref()).await; + + // Test list dir again + self.test_list_dir(parent_file_id).await; + + // Test file not found + self.test_file_not_found(23).await; + } + + async fn test_root_dir(&self) { + let root_file_stat = self.fs.stat(ROOT_DIR_FILE_ID).await; + assert!(root_file_stat.is_ok()); + let root_file_stat = root_file_stat.unwrap(); + self.assert_file_stat( + &root_file_stat, + Path::new(ROOT_DIR_PATH), + FileType::Directory, + 0, + ); + } + + async fn test_lookup_file( + &mut self, + parent_file_id: u64, + expect_name: &OsStr, + expect_kind: FileType, + expect_size: u64, + ) -> u64 { + let file_stat = self.fs.lookup(parent_file_id, expect_name).await; + assert!(file_stat.is_ok()); + let file_stat = file_stat.unwrap(); + self.assert_file_stat(&file_stat, &file_stat.path, expect_kind, expect_size); + assert_eq!(file_stat.name, expect_name); + let file_id = file_stat.file_id; + self.files.insert(file_stat.file_id, file_stat); + file_id + } + + async fn test_get_file_path(&mut self, file_id: u64, expect_path: &str) { + let file_path = self.fs.get_file_path(file_id).await; + assert!(file_path.is_ok()); + assert_eq!(file_path.unwrap(), expect_path); + } + + async fn test_stat_file( + &mut self, + file_id: u64, + expect_path: &Path, + expect_kind: FileType, + expect_size: u64, + ) { + let file_stat = self.fs.stat(file_id).await; + assert!(file_stat.is_ok()); + let file_stat = file_stat.unwrap(); + self.assert_file_stat(&file_stat, expect_path, expect_kind, expect_size); + self.files.insert(file_stat.file_id, file_stat); + } + + async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) { + let file = self.fs.create_file(root_file_id, name, 0).await; + assert!(file.is_ok()); + let file = file.unwrap(); + assert!(file.handle_id > 0); + assert!(file.file_id >= INITIAL_FILE_ID); + let file_stat = self.fs.stat(file.file_id).await; + assert!(file_stat.is_ok()); + + self.test_stat_file(file.file_id, &file_stat.unwrap().path, RegularFile, 0) + .await; + } + + async fn test_open_file(&self, root_file_id: u64, name: &OsStr) -> FileHandle { + let file = self.fs.lookup(root_file_id, name).await.unwrap(); + let file_handle = self.fs.open_file(file.file_id, 0).await; + assert!(file_handle.is_ok()); + let file_handle = file_handle.unwrap(); + assert_eq!(file_handle.file_id, file.file_id); + file_handle + } + + async fn test_write_file(&mut self, file_handle: &FileHandle, content: &str) { + let write_size = self + .fs + .write( + file_handle.file_id, + file_handle.handle_id, + 0, + content.as_bytes(), + ) + .await; + assert!(write_size.is_ok()); + assert_eq!(write_size.unwrap(), content.len() as u32); + + self.files.get_mut(&file_handle.file_id).unwrap().size = content.len() as u64; + } + + async fn test_read_file(&self, file_handle: &FileHandle, expected_content: &str) { + let read_data = self + .fs + .read( + file_handle.file_id, + file_handle.handle_id, + 0, + expected_content.len() as u32, + ) + .await; + assert!(read_data.is_ok()); + assert_eq!(read_data.unwrap(), expected_content.as_bytes()); + } + + async fn test_close_file(&self, file_handle: &FileHandle) { + let close_file = self + .fs + .close_file(file_handle.file_id, file_handle.handle_id) + .await; + assert!(close_file.is_ok()); + } + + async fn test_create_dir(&mut self, parent_file_id: u64, name: &OsStr) { + let dir = self.fs.create_dir(parent_file_id, name).await; + assert!(dir.is_ok()); + let dir_file_id = dir.unwrap(); + assert!(dir_file_id >= INITIAL_FILE_ID); + let dir_stat = self.fs.stat(dir_file_id).await; + assert!(dir_stat.is_ok()); + + self.test_stat_file(dir_file_id, &dir_stat.unwrap().path, Directory, 0) + .await; + } + + async fn test_list_dir(&self, root_file_id: u64) { + let list_dir = self.fs.read_dir(root_file_id).await; + assert!(list_dir.is_ok()); + let list_dir = list_dir.unwrap(); + assert_eq!(list_dir.len(), self.files.len()); + for file_stat in list_dir { + assert!(self.files.contains_key(&file_stat.file_id)); + let actual_file_stat = self.files.get(&file_stat.file_id).unwrap(); + self.assert_file_stat( + &file_stat, + &actual_file_stat.path, + actual_file_stat.kind, + actual_file_stat.size, + ); + } + } + + async fn test_remove_file(&mut self, root_file_id: u64, name: &OsStr) { + let file_stat = self.fs.lookup(root_file_id, name).await; + assert!(file_stat.is_ok()); + let file_stat = file_stat.unwrap(); + + let remove_file = self.fs.remove_file(root_file_id, name).await; + assert!(remove_file.is_ok()); + self.files.remove(&file_stat.file_id); + + self.test_file_not_found(file_stat.file_id).await; + } + + async fn test_remove_dir(&mut self, root_file_id: u64, name: &OsStr) { + let file_stat = self.fs.lookup(root_file_id, name).await; + assert!(file_stat.is_ok()); + let file_stat = file_stat.unwrap(); + + let remove_dir = self.fs.remove_dir(root_file_id, name).await; + assert!(remove_dir.is_ok()); + self.files.remove(&file_stat.file_id); + + self.test_file_not_found(file_stat.file_id).await; + } + + async fn test_file_not_found(&self, file_id: u64) { + let not_found_file = self.fs.stat(file_id).await; + assert!(not_found_file.is_err()); + } + + fn assert_file_stat(&self, file_stat: &FileStat, path: &Path, kind: FileType, size: u64) { + assert_eq!(file_stat.path, path); + assert_eq!(file_stat.kind, kind); + assert_eq!(file_stat.size, size); + if file_stat.file_id == 1 { + assert_eq!(file_stat.parent_file_id, 1); + } else { + assert!(file_stat.file_id >= INITIAL_FILE_ID); + assert!( + file_stat.parent_file_id == 1 || file_stat.parent_file_id >= INITIAL_FILE_ID + ); + } + } + } #[test] fn test_create_file_stat() { //test new file - let file_stat = FileStat::new_file_filestat("a", "b", 10); + let file_stat = FileStat::new_file_filestat(Path::new("a"), "b".as_ref(), 10); assert_eq!(file_stat.name, "b"); - assert_eq!(file_stat.path, "a/b"); + assert_eq!(file_stat.path, Path::new("a/b")); assert_eq!(file_stat.size, 10); assert_eq!(file_stat.kind, FileType::RegularFile); //test new dir - let file_stat = FileStat::new_dir_filestat("a", "b"); + let file_stat = FileStat::new_dir_filestat("a".as_ref(), "b".as_ref()); assert_eq!(file_stat.name, "b"); - assert_eq!(file_stat.path, "a/b"); + assert_eq!(file_stat.path, Path::new("a/b")); assert_eq!(file_stat.size, 0); assert_eq!(file_stat.kind, FileType::Directory); //test new file with path - let file_stat = FileStat::new_file_filestat_with_path("a/b", 10); + let file_stat = FileStat::new_file_filestat_with_path("a/b".as_ref(), 10); assert_eq!(file_stat.name, "b"); - assert_eq!(file_stat.path, "a/b"); + assert_eq!(file_stat.path, Path::new("a/b")); assert_eq!(file_stat.size, 10); assert_eq!(file_stat.kind, FileType::RegularFile); //test new dir with path - let file_stat = FileStat::new_dir_filestat_with_path("a/b"); + let file_stat = FileStat::new_dir_filestat_with_path("a/b".as_ref()); assert_eq!(file_stat.name, "b"); - assert_eq!(file_stat.path, "a/b"); + assert_eq!(file_stat.path, Path::new("a/b")); assert_eq!(file_stat.size, 0); assert_eq!(file_stat.kind, FileType::Directory); } #[test] fn test_file_stat_set_file_id() { - let mut file_stat = FileStat::new_file_filestat("a", "b", 10); + let mut file_stat = FileStat::new_file_filestat("a".as_ref(), "b".as_ref(), 10); file_stat.set_file_id(1, 2); assert_eq!(file_stat.file_id, 2); assert_eq!(file_stat.parent_file_id, 1); @@ -307,7 +685,7 @@ mod tests { #[test] #[should_panic(expected = "assertion failed: file_id != 0 && parent_file_id != 0")] fn test_file_stat_set_file_id_panic() { - let mut file_stat = FileStat::new_file_filestat("a", "b", 10); + let mut file_stat = FileStat::new_file_filestat("a".as_ref(), "b".as_ref(), 10); file_stat.set_file_id(1, 0); } } diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs b/clients/filesystem-fuse/src/fuse_api_handle.rs index 7dc5461ce7f..1f24e94ee86 100644 --- a/clients/filesystem-fuse/src/fuse_api_handle.rs +++ b/clients/filesystem-fuse/src/fuse_api_handle.rs @@ -95,8 +95,7 @@ impl Filesystem for FuseApiHandle { parent: Inode, name: &OsStr, ) -> fuse3::Result { - let name = name.to_string_lossy(); - let file_stat = self.fs.lookup(parent, &name).await?; + let file_stat = self.fs.lookup(parent, name).await?; Ok(ReplyEntry { ttl: self.default_ttl, attr: fstat_to_file_attr(&file_stat, &self.fs_context), @@ -154,8 +153,7 @@ impl Filesystem for FuseApiHandle { _mode: u32, _umask: u32, ) -> fuse3::Result { - let name = name.to_string_lossy(); - let handle_id = self.fs.create_dir(parent, &name).await?; + let handle_id = self.fs.create_dir(parent, name).await?; Ok(ReplyEntry { ttl: self.default_ttl, attr: dummy_file_attr( @@ -169,14 +167,12 @@ impl Filesystem for FuseApiHandle { } async fn unlink(&self, _req: Request, parent: Inode, name: &OsStr) -> fuse3::Result<()> { - let name = name.to_string_lossy(); - self.fs.remove_file(parent, &name).await?; + self.fs.remove_file(parent, name).await?; Ok(()) } async fn rmdir(&self, _req: Request, parent: Inode, name: &OsStr) -> fuse3::Result<()> { - let name = name.to_string_lossy(); - self.fs.remove_dir(parent, &name).await?; + self.fs.remove_dir(parent, name).await?; Ok(()) } @@ -267,7 +263,7 @@ impl Filesystem for FuseApiHandle { stream::iter(files.into_iter().enumerate().map(|(index, file_stat)| { Ok(DirectoryEntry { inode: file_stat.file_id, - name: file_stat.name.clone().into(), + name: file_stat.name.clone(), kind: file_stat.kind, offset: (index + 3) as i64, }) @@ -313,8 +309,7 @@ impl Filesystem for FuseApiHandle { _mode: u32, flags: u32, ) -> fuse3::Result { - let name = name.to_string_lossy(); - let file_handle = self.fs.create_file(parent, &name, flags).await?; + let file_handle = self.fs.create_file(parent, name, flags).await?; Ok(ReplyCreated { ttl: self.default_ttl, attr: dummy_file_attr( @@ -349,7 +344,7 @@ impl Filesystem for FuseApiHandle { stream::iter(files.into_iter().enumerate().map(|(index, file_stat)| { Ok(DirectoryEntryPlus { inode: file_stat.file_id, - name: file_stat.name.clone().into(), + name: file_stat.name.clone(), kind: file_stat.kind, offset: (index + 3) as i64, attr: fstat_to_file_attr(&file_stat, &self.fs_context), @@ -465,8 +460,8 @@ mod test { let file_stat = FileStat { file_id: 1, parent_file_id: 3, - name: "test".to_string(), - path: "".to_string(), + name: "test".into(), + path: "".into(), size: 10032, kind: FileType::RegularFile, atime: Timestamp { sec: 10, nsec: 3 }, diff --git a/clients/filesystem-fuse/src/fuse_server.rs b/clients/filesystem-fuse/src/fuse_server.rs new file mode 100644 index 00000000000..dae7c28a631 --- /dev/null +++ b/clients/filesystem-fuse/src/fuse_server.rs @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use fuse3::raw::{Filesystem, Session}; +use fuse3::{MountOptions, Result}; +use log::{error, info}; +use std::process::exit; +use std::sync::Arc; +use tokio::select; +use tokio::sync::Notify; + +/// Represents a FUSE server capable of starting and stopping the FUSE filesystem. +pub struct FuseServer { + // Notification for stop + close_notify: Arc, + + // Mount point of the FUSE filesystem + mount_point: String, +} + +impl FuseServer { + /// Creates a new instance of `FuseServer`. + pub fn new(mount_point: &str) -> Self { + Self { + close_notify: Arc::new(Default::default()), + mount_point: mount_point.to_string(), + } + } + + /// Starts the FUSE filesystem and blocks until it is stopped. + pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> Result<()> { + //check if the mount point exists + if !std::path::Path::new(&self.mount_point).exists() { + error!("Mount point {} does not exist", self.mount_point); + exit(libc::ENOENT); + } + + info!( + "Starting FUSE filesystem and mounting at {}", + self.mount_point + ); + + let mount_options = MountOptions::default(); + let mut mount_handle = Session::new(mount_options) + .mount_with_unprivileged(fuse_fs, &self.mount_point) + .await?; + + let handle = &mut mount_handle; + + select! { + res = handle => { + if res.is_err() { + error!("Failed to mount FUSE filesystem: {:?}", res.err()); + } + }, + _ = self.close_notify.notified() => { + if let Err(e) = mount_handle.unmount().await { + error!("Failed to unmount FUSE filesystem: {:?}", e); + } else { + info!("FUSE filesystem unmounted successfully."); + } + } + } + + // notify that the filesystem is stopped + self.close_notify.notify_one(); + Ok(()) + } + + /// Stops the FUSE filesystem. + pub async fn stop(&self) { + info!("Stopping FUSE filesystem..."); + self.close_notify.notify_one(); + + // wait for the filesystem to stop + self.close_notify.notified().await; + } +} diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs index c1689bac476..36e8c28d343 100644 --- a/clients/filesystem-fuse/src/lib.rs +++ b/clients/filesystem-fuse/src/lib.rs @@ -19,6 +19,17 @@ mod default_raw_filesystem; mod filesystem; mod fuse_api_handle; +mod fuse_server; +mod memory_filesystem; +mod mount; mod opened_file; mod opened_file_manager; mod utils; + +pub async fn gvfs_mount(mount_point: &str) -> fuse3::Result<()> { + mount::mount(mount_point).await +} + +pub async fn gvfs_unmount() { + mount::unmount().await; +} diff --git a/clients/filesystem-fuse/src/main.rs b/clients/filesystem-fuse/src/main.rs index 3d8e9dbb953..28866a9bb1c 100644 --- a/clients/filesystem-fuse/src/main.rs +++ b/clients/filesystem-fuse/src/main.rs @@ -16,69 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -mod default_raw_filesystem; -mod filesystem; -mod fuse_api_handle; -mod opened_file; -mod opened_file_manager; -mod utils; - -use log::debug; +use gvfs_fuse::{gvfs_mount, gvfs_unmount}; use log::info; -use std::process::exit; +use tokio::signal; #[tokio::main] -async fn main() { +async fn main() -> fuse3::Result<()> { tracing_subscriber::fmt().init(); - info!("Starting filesystem..."); - debug!("Shutdown filesystem..."); - exit(0); -} + tokio::spawn(async { gvfs_mount("gvfs").await }); -async fn create_gvfs_fuse_filesystem() { - // Gvfs-fuse filesystem structure: - // FuseApiHandle - // ├─ DefaultRawFileSystem (RawFileSystem) - // │ └─ FileSystemLog (PathFileSystem) - // │ ├─ GravitinoComposedFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ S3FileSystem (PathFileSystem) - // │ │ │ └─ OpenDALFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ HDFSFileSystem (PathFileSystem) - // │ │ │ └─ OpenDALFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ JuiceFileSystem (PathFileSystem) - // │ │ │ └─ NasFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ XXXFileSystem (PathFileSystem) - // - // `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs. - // It manages file and directory relationships, as well as file mappings. - // It delegates file operations to the PathFileSystem - // - // `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs. - // Similar implementations include permissions, caching, and metrics. - // - // `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`. - // It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage. - // If the user only mounts a fileset, this layer is not present. There will only be one below layer. - // - // `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path. - // and delegate the operation to the real storage. - // - // `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage. - // it can assess the S3, HDFS, gcs, azblob and other storage. - // - // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage. - // - // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage. - // - // `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS. - // - // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage. - // - // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. + let _ = signal::ctrl_c().await; + info!("Received Ctrl+C, Unmounting gvfs..."); + gvfs_unmount().await; - todo!("Implement the createGvfsFuseFileSystem function"); + Ok(()) } diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs b/clients/filesystem-fuse/src/memory_filesystem.rs new file mode 100644 index 00000000000..ca3f13fd9a6 --- /dev/null +++ b/clients/filesystem-fuse/src/memory_filesystem.rs @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::{FileReader, FileStat, FileWriter, PathFileSystem, Result}; +use crate::opened_file::{OpenFileFlags, OpenedFile}; +use async_trait::async_trait; +use bytes::Bytes; +use fuse3::FileType::{Directory, RegularFile}; +use fuse3::{Errno, FileType}; +use std::collections::BTreeMap; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, RwLock}; + +// Simple in-memory file implementation of MemoryFileSystem +struct MemoryFile { + kind: FileType, + data: Arc>>, +} + +// MemoryFileSystem is a simple in-memory filesystem implementation +// It is used for testing purposes +pub struct MemoryFileSystem { + // file_map is a map of file name to file size + file_map: RwLock>, +} + +impl MemoryFileSystem { + const FS_META_FILE_NAME: &'static str = "/.gvfs_meta"; + + pub(crate) async fn new() -> Self { + Self { + file_map: RwLock::new(Default::default()), + } + } + + fn create_file_stat(&self, path: &Path, file: &MemoryFile) -> FileStat { + match file.kind { + Directory => FileStat::new_dir_filestat_with_path(path), + _ => { + FileStat::new_file_filestat_with_path(path, file.data.lock().unwrap().len() as u64) + } + } + } +} + +#[async_trait] +impl PathFileSystem for MemoryFileSystem { + async fn init(&self) -> Result<()> { + let root_file = MemoryFile { + kind: Directory, + data: Arc::new(Mutex::new(Vec::new())), + }; + let root_path = PathBuf::from("/"); + self.file_map.write().unwrap().insert(root_path, root_file); + + let meta_file = MemoryFile { + kind: RegularFile, + data: Arc::new(Mutex::new(Vec::new())), + }; + let meta_file_path = Path::new(Self::FS_META_FILE_NAME).to_path_buf(); + self.file_map + .write() + .unwrap() + .insert(meta_file_path, meta_file); + Ok(()) + } + + async fn stat(&self, path: &Path) -> Result { + self.file_map + .read() + .unwrap() + .get(path) + .map(|x| self.create_file_stat(path, x)) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn read_dir(&self, path: &Path) -> Result> { + let file_map = self.file_map.read().unwrap(); + + let results: Vec = file_map + .iter() + .filter(|x| path_in_dir(path, x.0)) + .map(|(k, v)| self.create_file_stat(k, v)) + .collect(); + + Ok(results) + } + + async fn open_file(&self, path: &Path, _flags: OpenFileFlags) -> Result { + let file_stat = self.stat(path).await?; + let mut opened_file = OpenedFile::new(file_stat); + match opened_file.file_stat.kind { + Directory => Ok(opened_file), + RegularFile => { + let data = self + .file_map + .read() + .unwrap() + .get(&opened_file.file_stat.path) + .unwrap() + .data + .clone(); + opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() })); + opened_file.writer = Some(Box::new(MemoryFileWriter { data: data })); + Ok(opened_file) + } + _ => Err(Errno::from(libc::EBADF)), + } + } + + async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> Result { + self.open_file(path, flags).await + } + + async fn create_file(&self, path: &Path, _flags: OpenFileFlags) -> Result { + let mut file_map = self.file_map.write().unwrap(); + if file_map.contains_key(path) { + return Err(Errno::from(libc::EEXIST)); + } + + let mut opened_file = OpenedFile::new(FileStat::new_file_filestat_with_path(path, 0)); + + let data = Arc::new(Mutex::new(Vec::new())); + file_map.insert( + opened_file.file_stat.path.clone(), + MemoryFile { + kind: RegularFile, + data: data.clone(), + }, + ); + + opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() })); + opened_file.writer = Some(Box::new(MemoryFileWriter { data: data })); + + Ok(opened_file) + } + + async fn create_dir(&self, path: &Path) -> Result { + let mut file_map = self.file_map.write().unwrap(); + if file_map.contains_key(path) { + return Err(Errno::from(libc::EEXIST)); + } + + let file = FileStat::new_dir_filestat_with_path(path); + file_map.insert( + file.path.clone(), + MemoryFile { + kind: Directory, + data: Arc::new(Mutex::new(Vec::new())), + }, + ); + + Ok(file) + } + + async fn set_attr(&self, _name: &Path, _file_stat: &FileStat, _flush: bool) -> Result<()> { + Ok(()) + } + + async fn remove_file(&self, path: &Path) -> Result<()> { + let mut file_map = self.file_map.write().unwrap(); + if file_map.remove(path).is_none() { + return Err(Errno::from(libc::ENOENT)); + } + Ok(()) + } + + async fn remove_dir(&self, path: &Path) -> Result<()> { + let mut file_map = self.file_map.write().unwrap(); + let count = file_map.iter().filter(|x| path_in_dir(path, x.0)).count(); + + if count != 0 { + return Err(Errno::from(libc::ENOTEMPTY)); + } + + if file_map.remove(path).is_none() { + return Err(Errno::from(libc::ENOENT)); + } + Ok(()) + } +} + +pub(crate) struct MemoryFileReader { + pub(crate) data: Arc>>, +} + +#[async_trait] +impl FileReader for MemoryFileReader { + async fn read(&mut self, offset: u64, size: u32) -> Result { + let v = self.data.lock().unwrap(); + let start = offset as usize; + let end = usize::min(start + size as usize, v.len()); + if start >= v.len() { + return Ok(Bytes::default()); + } + Ok(v[start..end].to_vec().into()) + } +} + +pub(crate) struct MemoryFileWriter { + pub(crate) data: Arc>>, +} + +#[async_trait] +impl FileWriter for MemoryFileWriter { + async fn write(&mut self, offset: u64, data: &[u8]) -> Result { + let mut v = self.data.lock().unwrap(); + let start = offset as usize; + let end = start + data.len(); + + if v.len() < end { + v.resize(end, 0); + } + v[start..end].copy_from_slice(data); + Ok(data.len() as u32) + } +} + +fn path_in_dir(dir: &Path, path: &Path) -> bool { + if let Ok(relative_path) = path.strip_prefix(dir) { + relative_path.components().count() == 1 + } else { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::filesystem::tests::TestPathFileSystem; + + #[test] + fn test_path_in_dir() { + let dir = Path::new("/parent"); + + let path1 = Path::new("/parent/child1"); + let path2 = Path::new("/parent/a.txt"); + let path3 = Path::new("/parent/child1/grandchild"); + let path4 = Path::new("/other"); + + assert!(!path_in_dir(dir, dir)); + assert!(path_in_dir(dir, path1)); + assert!(path_in_dir(dir, path2)); + assert!(!path_in_dir(dir, path3)); + assert!(!path_in_dir(dir, path4)); + + let dir = Path::new("/"); + + let path1 = Path::new("/child1"); + let path2 = Path::new("/a.txt"); + let path3 = Path::new("/child1/grandchild"); + + assert!(!path_in_dir(dir, dir)); + assert!(path_in_dir(dir, path1)); + assert!(path_in_dir(dir, path2)); + assert!(!path_in_dir(dir, path3)); + } + + #[tokio::test] + async fn test_memory_file_system() { + let fs = MemoryFileSystem::new().await; + let _ = fs.init().await; + let mut tester = TestPathFileSystem::new(fs); + tester.test_path_file_system().await; + } +} diff --git a/clients/filesystem-fuse/src/mount.rs b/clients/filesystem-fuse/src/mount.rs new file mode 100644 index 00000000000..102e2401643 --- /dev/null +++ b/clients/filesystem-fuse/src/mount.rs @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::default_raw_filesystem::DefaultRawFileSystem; +use crate::filesystem::FileSystemContext; +use crate::fuse_api_handle::FuseApiHandle; +use crate::fuse_server::FuseServer; +use crate::memory_filesystem::MemoryFileSystem; +use fuse3::raw::Filesystem; +use log::info; +use once_cell::sync::Lazy; +use std::sync::Arc; +use tokio::sync::Mutex; + +static SERVER: Lazy>>> = Lazy::new(|| Mutex::new(None)); + +pub async fn mount(mount_point: &str) -> fuse3::Result<()> { + info!("Starting gvfs-fuse server..."); + let svr = Arc::new(FuseServer::new(mount_point)); + { + let mut server = SERVER.lock().await; + *server = Some(svr.clone()); + } + let fs = create_fuse_fs().await; + svr.start(fs).await +} + +pub async fn unmount() { + info!("Stop gvfs-fuse server..."); + let svr = { + let mut server = SERVER.lock().await; + if server.is_none() { + info!("Server is already stopped."); + return; + } + server.take().unwrap() + }; + let _ = svr.stop().await; +} + +pub async fn create_fuse_fs() -> impl Filesystem + Sync + 'static { + let uid = unsafe { libc::getuid() }; + let gid = unsafe { libc::getgid() }; + let fs_context = FileSystemContext { + uid: uid, + gid: gid, + default_file_perm: 0o644, + default_dir_perm: 0o755, + block_size: 4 * 1024, + }; + + let gvfs = MemoryFileSystem::new().await; + let fs = DefaultRawFileSystem::new(gvfs); + FuseApiHandle::new(fs, fs_context) +} + +pub async fn create_gvfs_filesystem() { + // Gvfs-fuse filesystem structure: + // FuseApiHandle + // ├─ DefaultRawFileSystem (RawFileSystem) + // │ └─ FileSystemLog (PathFileSystem) + // │ ├─ GravitinoComposedFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ S3FileSystem (PathFileSystem) + // │ │ │ └─ OpenDALFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ HDFSFileSystem (PathFileSystem) + // │ │ │ └─ OpenDALFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ JuiceFileSystem (PathFileSystem) + // │ │ │ └─ NasFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ XXXFileSystem (PathFileSystem) + // + // `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs. + // It manages file and directory relationships, as well as file mappings. + // It delegates file operations to the PathFileSystem + // + // `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs. + // Similar implementations include permissions, caching, and metrics. + // + // `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`. + // It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage. + // If the user only mounts a fileset, this layer is not present. There will only be one below layer. + // + // `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path. + // and delegate the operation to the real storage. + // + // `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage. + // it can assess the S3, HDFS, gcs, azblob and other storage. + // + // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage. + // + // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage. + // + // `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS. + // + // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage. + // + // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. + + todo!("Implement the createGvfsFuseFileSystem function"); +} diff --git a/clients/filesystem-fuse/src/opened_file.rs b/clients/filesystem-fuse/src/opened_file.rs index ba3e41595da..5bc961c9a6b 100644 --- a/clients/filesystem-fuse/src/opened_file.rs +++ b/clients/filesystem-fuse/src/opened_file.rs @@ -126,10 +126,15 @@ pub(crate) struct OpenFileFlags(pub(crate) u32); mod tests { use super::*; use crate::filesystem::FileStat; + use std::path::Path; #[test] fn test_open_file() { - let mut open_file = OpenedFile::new(FileStat::new_file_filestat("a", "b", 10)); + let mut open_file = OpenedFile::new(FileStat::new_file_filestat( + Path::new("a"), + "b".as_ref(), + 10, + )); assert_eq!(open_file.file_stat.name, "b"); assert_eq!(open_file.file_stat.size, 10); diff --git a/clients/filesystem-fuse/src/opened_file_manager.rs b/clients/filesystem-fuse/src/opened_file_manager.rs index 17bfe00a397..ab6a5d82347 100644 --- a/clients/filesystem-fuse/src/opened_file_manager.rs +++ b/clients/filesystem-fuse/src/opened_file_manager.rs @@ -69,13 +69,14 @@ impl OpenedFileManager { mod tests { use super::*; use crate::filesystem::FileStat; + use std::path::Path; #[tokio::test] async fn test_opened_file_manager() { let manager = OpenedFileManager::new(); - let file1_stat = FileStat::new_file_filestat("", "a.txt", 13); - let file2_stat = FileStat::new_file_filestat("", "b.txt", 18); + let file1_stat = FileStat::new_file_filestat(Path::new(""), "a.txt".as_ref(), 13); + let file2_stat = FileStat::new_file_filestat(Path::new(""), "b.txt".as_ref(), 18); let file1 = OpenedFile::new(file1_stat.clone()); let file2 = OpenedFile::new(file2_stat.clone()); diff --git a/clients/filesystem-fuse/src/utils.rs b/clients/filesystem-fuse/src/utils.rs index 0c0cc80a162..21e52f86af8 100644 --- a/clients/filesystem-fuse/src/utils.rs +++ b/clients/filesystem-fuse/src/utils.rs @@ -16,52 +16,6 @@ * specific language governing permissions and limitations * under the License. */ -use crate::filesystem::RawFileSystem; - -// join the parent and name to a path -pub fn join_file_path(parent: &str, name: &str) -> String { - //TODO handle corner cases - if parent.is_empty() { - name.to_string() - } else { - format!("{}/{}", parent, name) - } -} - -// split the path to parent and name -pub fn split_file_path(path: &str) -> (&str, &str) { - match path.rfind('/') { - Some(pos) => (&path[..pos], &path[pos + 1..]), - None => ("", path), - } -} - -// convert file id to file path string if file id is invalid return "Unknown" -pub async fn file_id_to_file_path_string(file_id: u64, fs: &impl RawFileSystem) -> String { - fs.get_file_path(file_id) - .await - .unwrap_or("Unknown".to_string()) -} #[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_join_file_path() { - assert_eq!(join_file_path("", "a"), "a"); - assert_eq!(join_file_path("", "a.txt"), "a.txt"); - assert_eq!(join_file_path("a", "b"), "a/b"); - assert_eq!(join_file_path("a/b", "c"), "a/b/c"); - assert_eq!(join_file_path("a/b", "c.txt"), "a/b/c.txt"); - } - - #[test] - fn test_split_file_path() { - assert_eq!(split_file_path("a"), ("", "a")); - assert_eq!(split_file_path("a.txt"), ("", "a.txt")); - assert_eq!(split_file_path("a/b"), ("a", "b")); - assert_eq!(split_file_path("a/b/c"), ("a/b", "c")); - assert_eq!(split_file_path("a/b/c.txt"), ("a/b", "c.txt")); - } -} +mod tests {} diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs new file mode 100644 index 00000000000..23aafbaf6e4 --- /dev/null +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use gvfs_fuse::{gvfs_mount, gvfs_unmount}; +use log::info; +use std::fs; +use std::fs::File; +use std::path::Path; +use std::sync::Arc; +use std::thread::sleep; +use std::time::{Duration, Instant}; +use tokio::runtime::Runtime; +use tokio::task::JoinHandle; + +struct FuseTest { + runtime: Arc, + mount_point: String, + gvfs_mount: Option>>, +} + +impl FuseTest { + pub fn setup(&mut self) { + info!("Start gvfs fuse server"); + let mount_point = self.mount_point.clone(); + self.runtime + .spawn(async move { gvfs_mount(&mount_point).await }); + let success = Self::wait_for_fuse_server_ready(&self.mount_point, Duration::from_secs(15)); + assert!(success, "Fuse server cannot start up at 15 seconds"); + } + + pub fn shutdown(&mut self) { + self.runtime.block_on(async { + gvfs_unmount().await; + }); + } + + fn wait_for_fuse_server_ready(path: &str, timeout: Duration) -> bool { + let test_file = format!("{}/.gvfs_meta", path); + let start_time = Instant::now(); + + while start_time.elapsed() < timeout { + if file_exists(&test_file) { + return true; + } + info!("Wait for fuse server ready",); + sleep(Duration::from_secs(1)); + } + false + } +} + +impl Drop for FuseTest { + fn drop(&mut self) { + info!("Shutdown fuse server"); + self.shutdown(); + } +} + +#[test] +fn test_fuse_system_with_auto() { + tracing_subscriber::fmt().init(); + + let mount_point = "build/gvfs"; + let _ = fs::create_dir_all(mount_point); + + let mut test = FuseTest { + runtime: Arc::new(Runtime::new().unwrap()), + mount_point: mount_point.to_string(), + gvfs_mount: None, + }; + + test.setup(); + test_fuse_filesystem(mount_point); +} + +fn test_fuse_system_with_manual() { + test_fuse_filesystem("build/gvfs"); +} + +fn test_fuse_filesystem(mount_point: &str) { + info!("Test startup"); + let base_path = Path::new(mount_point); + + //test create file + let test_file = base_path.join("test_create"); + let file = File::create(&test_file).expect("Failed to create file"); + assert!(file.metadata().is_ok(), "Failed to get file metadata"); + assert!(file_exists(&test_file)); + + //test write file + fs::write(&test_file, "read test").expect("Failed to write file"); + + //test read file + let content = fs::read_to_string(test_file.clone()).expect("Failed to read file"); + assert_eq!(content, "read test", "File content mismatch"); + + //test delete file + fs::remove_file(test_file.clone()).expect("Failed to delete file"); + assert!(!file_exists(test_file)); + + //test create directory + let test_dir = base_path.join("test_dir"); + fs::create_dir(&test_dir).expect("Failed to create directory"); + + //test create file in directory + let test_file = base_path.join("test_dir/test_file"); + let file = File::create(&test_file).expect("Failed to create file"); + assert!(file.metadata().is_ok(), "Failed to get file metadata"); + + //test write file in directory + let test_file = base_path.join("test_dir/test_read"); + fs::write(&test_file, "read test").expect("Failed to write file"); + + //test read file in directory + let content = fs::read_to_string(&test_file).expect("Failed to read file"); + assert_eq!(content, "read test", "File content mismatch"); + + //test delete file in directory + fs::remove_file(&test_file).expect("Failed to delete file"); + assert!(!file_exists(&test_file)); + + //test delete directory + fs::remove_dir_all(&test_dir).expect("Failed to delete directory"); + assert!(!file_exists(&test_dir)); + + info!("Success test"); +} + +fn file_exists>(path: P) -> bool { + fs::metadata(path).is_ok() +} diff --git a/clients/filesystem-fuse/tests/it.rs b/clients/filesystem-fuse/tests/it.rs deleted file mode 100644 index 989e5f9895e..00000000000 --- a/clients/filesystem-fuse/tests/it.rs +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#[test] -fn test_math_add() { - assert_eq!(1, 1); -}