From 42171d411a7c84e7f50a8b1d260e36194866c5dc Mon Sep 17 00:00:00 2001 From: Yuhui Date: Tue, 31 Dec 2024 10:15:39 +0800 Subject: [PATCH] [#5982] feat (gvfs-fuse): Implement Gravitino fileset file system (#5984) ### What changes were proposed in this pull request? Implement an Gravitino fileset file system, Support mount fileset to local directory ### Why are the changes needed? Fix: #5982 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT and IT --- clients/filesystem-fuse/Cargo.toml | 7 + clients/filesystem-fuse/conf/gvfs_fuse.toml | 38 ++ clients/filesystem-fuse/src/config.rs | 330 ++++++++++++++++++ .../src/default_raw_filesystem.rs | 32 +- clients/filesystem-fuse/src/error.rs | 69 ++++ clients/filesystem-fuse/src/filesystem.rs | 56 ++- .../filesystem-fuse/src/fuse_api_handle.rs | 3 +- clients/filesystem-fuse/src/fuse_server.rs | 8 +- .../filesystem-fuse/src/gravitino_client.rs | 277 +++++++++++++++ .../src/gravitino_fileset_filesystem.rs | 130 +++++++ clients/filesystem-fuse/src/gvfs_fuse.rs | 246 +++++++++++++ clients/filesystem-fuse/src/lib.rs | 17 +- clients/filesystem-fuse/src/main.rs | 21 +- .../filesystem-fuse/src/memory_filesystem.rs | 8 +- clients/filesystem-fuse/src/mount.rs | 118 ------- clients/filesystem-fuse/src/utils.rs | 3 + .../tests/conf/gvfs_fuse_memory.toml | 40 +++ .../tests/conf/gvfs_fuse_test.toml | 40 +++ clients/filesystem-fuse/tests/fuse_test.rs | 10 +- 19 files changed, 1281 insertions(+), 172 deletions(-) create mode 100644 clients/filesystem-fuse/conf/gvfs_fuse.toml create mode 100644 clients/filesystem-fuse/src/config.rs create mode 100644 clients/filesystem-fuse/src/error.rs create mode 100644 clients/filesystem-fuse/src/gravitino_client.rs create mode 100644 clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs create mode 100644 clients/filesystem-fuse/src/gvfs_fuse.rs delete mode 100644 clients/filesystem-fuse/src/mount.rs create mode 100644 clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml create mode 100644 clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml diff --git a/clients/filesystem-fuse/Cargo.toml b/clients/filesystem-fuse/Cargo.toml index 75a4dd71301..4008ec5ca2f 100644 --- a/clients/filesystem-fuse/Cargo.toml +++ b/clients/filesystem-fuse/Cargo.toml @@ -35,11 +35,18 @@ name = "gvfs_fuse" [dependencies] async-trait = "0.1" bytes = "1.6.0" +config = "0.13" dashmap = "6.1.0" fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] } futures-util = "0.3.30" libc = "0.2.168" log = "0.4.22" once_cell = "1.20.2" +reqwest = { version = "0.12.9", features = ["json"] } +serde = { version = "1.0.216", features = ["derive"] } tokio = { version = "1.38.0", features = ["full"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +urlencoding = "2.1.3" + +[dev-dependencies] +mockito = "0.31" diff --git a/clients/filesystem-fuse/conf/gvfs_fuse.toml b/clients/filesystem-fuse/conf/gvfs_fuse.toml new file mode 100644 index 00000000000..94d3d8560fd --- /dev/null +++ b/clients/filesystem-fuse/conf/gvfs_fuse.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# fuse settings +[fuse] +file_mask = 0o600 +dir_mask = 0o700 +fs_type = "memory" + +[fuse.properties] + +# filesystem settings +[filesystem] +block_size = 8192 + +# Gravitino settings +[gravitino] +uri = "http://localhost:8090" +metalake = "your_metalake" + +# extent settings +[extend_config] +access_key = "your access_key" +secret_key = "your_secret_key" diff --git a/clients/filesystem-fuse/src/config.rs b/clients/filesystem-fuse/src/config.rs new file mode 100644 index 00000000000..b381caa75c5 --- /dev/null +++ b/clients/filesystem-fuse/src/config.rs @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::error::ErrorCode::{ConfigNotFound, InvalidConfig}; +use crate::utils::GvfsResult; +use config::{builder, Config}; +use log::{error, info, warn}; +use serde::Deserialize; +use std::collections::HashMap; +use std::fs; + +pub(crate) const CONF_FUSE_FILE_MASK: ConfigEntity = ConfigEntity::new( + FuseConfig::MODULE_NAME, + "file_mask", + "The default file mask for the FUSE filesystem", + 0o600, +); + +pub(crate) const CONF_FUSE_DIR_MASK: ConfigEntity = ConfigEntity::new( + FuseConfig::MODULE_NAME, + "dir_mask", + "The default directory mask for the FUSE filesystem", + 0o700, +); + +pub(crate) const CONF_FUSE_FS_TYPE: ConfigEntity<&'static str> = ConfigEntity::new( + FuseConfig::MODULE_NAME, + "fs_type", + "The type of the FUSE filesystem", + "memory", +); + +pub(crate) const CONF_FUSE_CONFIG_PATH: ConfigEntity<&'static str> = ConfigEntity::new( + FuseConfig::MODULE_NAME, + "config_path", + "The path of the FUSE configuration file", + "/etc/gvfs/gvfs.toml", +); + +pub(crate) const CONF_FILESYSTEM_BLOCK_SIZE: ConfigEntity = ConfigEntity::new( + FilesystemConfig::MODULE_NAME, + "block_size", + "The block size of the gvfs fuse filesystem", + 4096, +); + +pub(crate) const CONF_GRAVITINO_URI: ConfigEntity<&'static str> = ConfigEntity::new( + GravitinoConfig::MODULE_NAME, + "uri", + "The URI of the Gravitino server", + "http://localhost:8090", +); + +pub(crate) const CONF_GRAVITINO_METALAKE: ConfigEntity<&'static str> = ConfigEntity::new( + GravitinoConfig::MODULE_NAME, + "metalake", + "The metalake of the Gravitino server", + "", +); + +pub(crate) struct ConfigEntity { + module: &'static str, + name: &'static str, + description: &'static str, + pub(crate) default: T, +} + +impl ConfigEntity { + const fn new( + module: &'static str, + name: &'static str, + description: &'static str, + default: T, + ) -> Self { + ConfigEntity { + module: module, + name: name, + description: description, + default: default, + } + } +} + +enum ConfigValue { + I32(ConfigEntity), + U32(ConfigEntity), + String(ConfigEntity<&'static str>), + Bool(ConfigEntity), + Float(ConfigEntity), +} + +struct DefaultConfig { + configs: HashMap, +} + +impl Default for DefaultConfig { + fn default() -> Self { + let mut configs = HashMap::new(); + + configs.insert( + Self::compose_key(CONF_FUSE_FILE_MASK), + ConfigValue::U32(CONF_FUSE_FILE_MASK), + ); + configs.insert( + Self::compose_key(CONF_FUSE_DIR_MASK), + ConfigValue::U32(CONF_FUSE_DIR_MASK), + ); + configs.insert( + Self::compose_key(CONF_FUSE_FS_TYPE), + ConfigValue::String(CONF_FUSE_FS_TYPE), + ); + configs.insert( + Self::compose_key(CONF_FUSE_CONFIG_PATH), + ConfigValue::String(CONF_FUSE_CONFIG_PATH), + ); + configs.insert( + Self::compose_key(CONF_GRAVITINO_URI), + ConfigValue::String(CONF_GRAVITINO_URI), + ); + configs.insert( + Self::compose_key(CONF_GRAVITINO_METALAKE), + ConfigValue::String(CONF_GRAVITINO_METALAKE), + ); + configs.insert( + Self::compose_key(CONF_FILESYSTEM_BLOCK_SIZE), + ConfigValue::U32(CONF_FILESYSTEM_BLOCK_SIZE), + ); + + DefaultConfig { configs } + } +} + +impl DefaultConfig { + fn compose_key(entity: ConfigEntity) -> String { + format!("{}.{}", entity.module, entity.name) + } +} + +#[derive(Debug, Deserialize)] +pub struct AppConfig { + #[serde(default)] + pub fuse: FuseConfig, + #[serde(default)] + pub filesystem: FilesystemConfig, + #[serde(default)] + pub gravitino: GravitinoConfig, + #[serde(default)] + pub extend_config: HashMap, +} + +impl Default for AppConfig { + fn default() -> Self { + let builder = Self::crete_default_config_builder(); + let conf = builder + .build() + .expect("Failed to build default configuration"); + conf.try_deserialize::() + .expect("Failed to deserialize default AppConfig") + } +} + +type ConfigBuilder = builder::ConfigBuilder; + +impl AppConfig { + fn crete_default_config_builder() -> ConfigBuilder { + let default = DefaultConfig::default(); + + default + .configs + .values() + .fold( + Config::builder(), + |builder, config_entity| match config_entity { + ConfigValue::I32(entity) => Self::add_config(builder, entity), + ConfigValue::U32(entity) => Self::add_config(builder, entity), + ConfigValue::String(entity) => Self::add_config(builder, entity), + ConfigValue::Bool(entity) => Self::add_config(builder, entity), + ConfigValue::Float(entity) => Self::add_config(builder, entity), + }, + ) + } + + fn add_config>( + builder: ConfigBuilder, + entity: &ConfigEntity, + ) -> ConfigBuilder { + let name = format!("{}.{}", entity.module, entity.name); + builder + .set_default(&name, entity.default.clone().into()) + .unwrap_or_else(|e| panic!("Failed to set default for {}: {}", entity.name, e)) + } + + pub fn from_file(config_file_path: Option<&str>) -> GvfsResult { + let builder = Self::crete_default_config_builder(); + + let config_path = { + if config_file_path.is_some() { + let path = config_file_path.unwrap(); + //check config file exists + if fs::metadata(path).is_err() { + return Err( + ConfigNotFound.to_error("The configuration file not found".to_string()) + ); + } + info!("Use configuration file: {}", path); + path + } else { + //use default config + if fs::metadata(CONF_FUSE_CONFIG_PATH.default).is_err() { + warn!( + "The default configuration file is not found, using the default configuration" + ); + return Ok(AppConfig::default()); + } else { + warn!( + "Using the default config file {}", + CONF_FUSE_CONFIG_PATH.default + ); + } + CONF_FUSE_CONFIG_PATH.default + } + }; + let config = builder + .add_source(config::File::with_name(config_path).required(true)) + .build(); + if let Err(e) = config { + let msg = format!("Failed to build configuration: {}", e); + error!("{}", msg); + return Err(InvalidConfig.to_error(msg)); + } + + let conf = config.unwrap(); + let app_config = conf.try_deserialize::(); + + if let Err(e) = app_config { + let msg = format!("Failed to deserialize configuration: {}", e); + error!("{}", msg); + return Err(InvalidConfig.to_error(msg)); + } + Ok(app_config.unwrap()) + } +} + +#[derive(Debug, Deserialize, Default)] +pub struct FuseConfig { + #[serde(default)] + pub file_mask: u32, + #[serde(default)] + pub dir_mask: u32, + #[serde(default)] + pub fs_type: String, + #[serde(default)] + pub config_path: String, + #[serde(default)] + pub properties: HashMap, +} + +impl FuseConfig { + const MODULE_NAME: &'static str = "fuse"; +} + +#[derive(Debug, Deserialize, Default)] +pub struct FilesystemConfig { + #[serde(default)] + pub block_size: u32, +} + +impl FilesystemConfig { + const MODULE_NAME: &'static str = "filesystem"; +} + +#[derive(Debug, Deserialize, Default)] +pub struct GravitinoConfig { + #[serde(default)] + pub uri: String, + #[serde(default)] + pub metalake: String, +} + +impl GravitinoConfig { + const MODULE_NAME: &'static str = "gravitino"; +} + +#[cfg(test)] +mod test { + use crate::config::AppConfig; + + #[test] + fn test_config_from_file() { + let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_test.toml")).unwrap(); + assert_eq!(config.fuse.file_mask, 0o644); + assert_eq!(config.fuse.dir_mask, 0o755); + assert_eq!(config.filesystem.block_size, 8192); + assert_eq!(config.gravitino.uri, "http://localhost:8090"); + assert_eq!(config.gravitino.metalake, "test"); + assert_eq!( + config.extend_config.get("access_key"), + Some(&"XXX_access_key".to_string()) + ); + assert_eq!( + config.extend_config.get("secret_key"), + Some(&"XXX_secret_key".to_string()) + ); + } + + #[test] + fn test_default_config() { + let config = AppConfig::default(); + assert_eq!(config.fuse.file_mask, 0o600); + assert_eq!(config.fuse.dir_mask, 0o700); + assert_eq!(config.filesystem.block_size, 4096); + assert_eq!(config.gravitino.uri, "http://localhost:8090"); + assert_eq!(config.gravitino.metalake, ""); + } +} diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs b/clients/filesystem-fuse/src/default_raw_filesystem.rs index 0ab92e91640..0c9836e5b33 100644 --- a/clients/filesystem-fuse/src/default_raw_filesystem.rs +++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs @@ -16,9 +16,10 @@ * specific language governing permissions and limitations * under the License. */ +use crate::config::AppConfig; use crate::filesystem::{ - FileStat, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, ROOT_DIR_FILE_ID, - ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH, + FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, + ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH, }; use crate::opened_file::{FileHandle, OpenFileFlags}; use crate::opened_file_manager::OpenedFileManager; @@ -47,7 +48,7 @@ pub struct DefaultRawFileSystem { } impl DefaultRawFileSystem { - pub(crate) fn new(fs: T) -> Self { + pub(crate) fn new(fs: T, _config: &AppConfig, _fs_context: &FileSystemContext) -> Self { Self { file_entry_manager: RwLock::new(FileEntryManager::new()), opened_file_manager: OpenedFileManager::new(), @@ -189,8 +190,7 @@ impl RawFileSystem for DefaultRawFileSystem { let file_entry = self.get_file_entry(file_id).await?; let mut child_filestats = self.fs.read_dir(&file_entry.path).await?; for file_stat in child_filestats.iter_mut() { - self.resolve_file_id_to_filestat(file_stat, file_stat.file_id) - .await; + self.resolve_file_id_to_filestat(file_stat, file_id).await; } Ok(child_filestats) } @@ -280,13 +280,7 @@ impl RawFileSystem for DefaultRawFileSystem { file.close().await } - async fn read( - &self, - _file_id: u64, - fh: u64, - offset: u64, - size: u32, - ) -> crate::filesystem::Result { + async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) -> Result { let (data, file_stat) = { let opened_file = self .opened_file_manager @@ -303,13 +297,7 @@ impl RawFileSystem for DefaultRawFileSystem { data } - async fn write( - &self, - _file_id: u64, - fh: u64, - offset: u64, - data: &[u8], - ) -> crate::filesystem::Result { + async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result { let (len, file_stat) = { let opened_file = self .opened_file_manager @@ -405,7 +393,11 @@ mod tests { #[tokio::test] async fn test_default_raw_file_system() { let memory_fs = MemoryFileSystem::new().await; - let raw_fs = DefaultRawFileSystem::new(memory_fs); + let raw_fs = DefaultRawFileSystem::new( + memory_fs, + &AppConfig::default(), + &FileSystemContext::default(), + ); let _ = raw_fs.init().await; let mut tester = TestRawFileSystem::new(raw_fs); tester.test_raw_file_system().await; diff --git a/clients/filesystem-fuse/src/error.rs b/clients/filesystem-fuse/src/error.rs new file mode 100644 index 00000000000..ba3c037c5ca --- /dev/null +++ b/clients/filesystem-fuse/src/error.rs @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use fuse3::Errno; + +#[derive(Debug, Copy, Clone)] +pub enum ErrorCode { + UnSupportedFilesystem, + GravitinoClientError, + InvalidConfig, + ConfigNotFound, +} + +impl ErrorCode { + pub fn to_error(self, message: impl Into) -> GvfsError { + GvfsError::Error(self, message.into()) + } +} + +impl std::fmt::Display for ErrorCode { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + ErrorCode::UnSupportedFilesystem => write!(f, "Unsupported filesystem"), + ErrorCode::GravitinoClientError => write!(f, "Gravitino client error"), + ErrorCode::InvalidConfig => write!(f, "Invalid config"), + ErrorCode::ConfigNotFound => write!(f, "Config not found"), + } + } +} + +#[derive(Debug)] +pub enum GvfsError { + RestError(String, reqwest::Error), + Error(ErrorCode, String), + Errno(Errno), + IOError(std::io::Error), +} +impl From for GvfsError { + fn from(err: reqwest::Error) -> Self { + GvfsError::RestError("Http request failed:".to_owned() + &err.to_string(), err) + } +} + +impl From for GvfsError { + fn from(errno: Errno) -> Self { + GvfsError::Errno(errno) + } +} + +impl From for GvfsError { + fn from(err: std::io::Error) -> Self { + GvfsError::IOError(err) + } +} diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs index d9440b0e652..742cdd4c879 100644 --- a/clients/filesystem-fuse/src/filesystem.rs +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -16,6 +16,9 @@ * specific language governing permissions and limitations * under the License. */ +use crate::config::{ + AppConfig, CONF_FILESYSTEM_BLOCK_SIZE, CONF_FUSE_DIR_MASK, CONF_FUSE_FILE_MASK, +}; use crate::opened_file::{FileHandle, OpenFileFlags, OpenedFile}; use async_trait::async_trait; use bytes::Bytes; @@ -129,6 +132,8 @@ pub(crate) trait PathFileSystem: Send + Sync { /// Remove the directory by file path async fn remove_dir(&self, path: &Path) -> Result<()>; + + fn get_capacity(&self) -> Result; } // FileSystemContext is the system environment for the fuse file system. @@ -150,17 +155,30 @@ pub(crate) struct FileSystemContext { } impl FileSystemContext { - pub(crate) fn new(uid: u32, gid: u32) -> Self { + pub(crate) fn new(uid: u32, gid: u32, config: &AppConfig) -> Self { FileSystemContext { uid, gid, - default_file_perm: 0o644, - default_dir_perm: 0o755, - block_size: 4 * 1024, + default_file_perm: config.fuse.file_mask as u16, + default_dir_perm: config.fuse.dir_mask as u16, + block_size: config.filesystem.block_size, + } + } + + pub(crate) fn default() -> Self { + FileSystemContext { + uid: 0, + gid: 0, + default_file_perm: CONF_FUSE_FILE_MASK.default as u16, + default_dir_perm: CONF_FUSE_DIR_MASK.default as u16, + block_size: CONF_FILESYSTEM_BLOCK_SIZE.default, } } } +// capacity of the file system +pub struct FileSystemCapacity {} + // FileStat is the file metadata of the file #[derive(Clone, Debug)] pub struct FileStat { @@ -336,7 +354,7 @@ pub(crate) mod tests { let opened_file = self.fs.create_file(path, OpenFileFlags(0)).await; assert!(opened_file.is_ok()); let file = opened_file.unwrap(); - self.assert_file_stat(&file.file_stat, path, FileType::RegularFile, 0); + self.assert_file_stat(&file.file_stat, path, RegularFile, 0); self.test_stat_file(path, RegularFile, 0).await; } @@ -410,6 +428,9 @@ pub(crate) mod tests { // Test root dir self.test_root_dir().await; + // test read root dir + self.test_list_dir(ROOT_DIR_FILE_ID, false).await; + let parent_file_id = ROOT_DIR_FILE_ID; // Test lookup file let file_id = self @@ -445,7 +466,7 @@ pub(crate) mod tests { self.test_create_dir(parent_file_id, "dir1".as_ref()).await; // Test list dir - self.test_list_dir(parent_file_id).await; + self.test_list_dir(parent_file_id, true).await; // Test remove file self.test_remove_file(parent_file_id, "file1.txt".as_ref()) @@ -455,7 +476,7 @@ pub(crate) mod tests { self.test_remove_dir(parent_file_id, "dir1".as_ref()).await; // Test list dir again - self.test_list_dir(parent_file_id).await; + self.test_list_dir(parent_file_id, true).await; // Test file not found self.test_file_not_found(23).await; @@ -465,12 +486,7 @@ pub(crate) mod tests { let root_file_stat = self.fs.stat(ROOT_DIR_FILE_ID).await; assert!(root_file_stat.is_ok()); let root_file_stat = root_file_stat.unwrap(); - self.assert_file_stat( - &root_file_stat, - Path::new(ROOT_DIR_PATH), - FileType::Directory, - 0, - ); + self.assert_file_stat(&root_file_stat, Path::new(ROOT_DIR_PATH), Directory, 0); } async fn test_lookup_file( @@ -582,10 +598,14 @@ pub(crate) mod tests { .await; } - async fn test_list_dir(&self, root_file_id: u64) { + async fn test_list_dir(&self, root_file_id: u64, check_child: bool) { let list_dir = self.fs.read_dir(root_file_id).await; assert!(list_dir.is_ok()); let list_dir = list_dir.unwrap(); + + if !check_child { + return; + } assert_eq!(list_dir.len(), self.files.len()); for file_stat in list_dir { assert!(self.files.contains_key(&file_stat.file_id)); @@ -650,28 +670,28 @@ pub(crate) mod tests { assert_eq!(file_stat.name, "b"); assert_eq!(file_stat.path, Path::new("a/b")); assert_eq!(file_stat.size, 10); - assert_eq!(file_stat.kind, FileType::RegularFile); + assert_eq!(file_stat.kind, RegularFile); //test new dir let file_stat = FileStat::new_dir_filestat("a".as_ref(), "b".as_ref()); assert_eq!(file_stat.name, "b"); assert_eq!(file_stat.path, Path::new("a/b")); assert_eq!(file_stat.size, 0); - assert_eq!(file_stat.kind, FileType::Directory); + assert_eq!(file_stat.kind, Directory); //test new file with path let file_stat = FileStat::new_file_filestat_with_path("a/b".as_ref(), 10); assert_eq!(file_stat.name, "b"); assert_eq!(file_stat.path, Path::new("a/b")); assert_eq!(file_stat.size, 10); - assert_eq!(file_stat.kind, FileType::RegularFile); + assert_eq!(file_stat.kind, RegularFile); //test new dir with path let file_stat = FileStat::new_dir_filestat_with_path("a/b".as_ref()); assert_eq!(file_stat.name, "b"); assert_eq!(file_stat.path, Path::new("a/b")); assert_eq!(file_stat.size, 0); - assert_eq!(file_stat.kind, FileType::Directory); + assert_eq!(file_stat.kind, Directory); } #[test] diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs b/clients/filesystem-fuse/src/fuse_api_handle.rs index 1f24e94ee86..153e323891c 100644 --- a/clients/filesystem-fuse/src/fuse_api_handle.rs +++ b/clients/filesystem-fuse/src/fuse_api_handle.rs @@ -17,6 +17,7 @@ * under the License. */ +use crate::config::AppConfig; use crate::filesystem::{FileStat, FileSystemContext, RawFileSystem}; use fuse3::path::prelude::{ReplyData, ReplyOpen, ReplyStatFs, ReplyWrite}; use fuse3::path::Request; @@ -44,7 +45,7 @@ impl FuseApiHandle { const DEFAULT_ATTR_TTL: Duration = Duration::from_secs(1); const DEFAULT_MAX_WRITE_SIZE: u32 = 16 * 1024; - pub fn new(fs: T, context: FileSystemContext) -> Self { + pub fn new(fs: T, _config: &AppConfig, context: FileSystemContext) -> Self { Self { fs: fs, default_ttl: Self::DEFAULT_ATTR_TTL, diff --git a/clients/filesystem-fuse/src/fuse_server.rs b/clients/filesystem-fuse/src/fuse_server.rs index dae7c28a631..a059686e16c 100644 --- a/clients/filesystem-fuse/src/fuse_server.rs +++ b/clients/filesystem-fuse/src/fuse_server.rs @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ +use crate::utils::GvfsResult; use fuse3::raw::{Filesystem, Session}; -use fuse3::{MountOptions, Result}; +use fuse3::MountOptions; use log::{error, info}; use std::process::exit; use std::sync::Arc; @@ -43,7 +44,7 @@ impl FuseServer { } /// Starts the FUSE filesystem and blocks until it is stopped. - pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> Result<()> { + pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> GvfsResult<()> { //check if the mount point exists if !std::path::Path::new(&self.mount_point).exists() { error!("Mount point {} does not exist", self.mount_point); @@ -83,11 +84,12 @@ impl FuseServer { } /// Stops the FUSE filesystem. - pub async fn stop(&self) { + pub async fn stop(&self) -> GvfsResult<()> { info!("Stopping FUSE filesystem..."); self.close_notify.notify_one(); // wait for the filesystem to stop self.close_notify.notified().await; + Ok(()) } } diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs new file mode 100644 index 00000000000..e5553c9f6c8 --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_client.rs @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::GravitinoConfig; +use crate::error::{ErrorCode, GvfsError}; +use reqwest::Client; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt::Debug; +use urlencoding::encode; + +#[derive(Debug, Deserialize)] +pub(crate) struct Fileset { + pub(crate) name: String, + #[serde(rename = "type")] + pub(crate) fileset_type: String, + comment: String, + #[serde(rename = "storageLocation")] + pub(crate) storage_location: String, + properties: HashMap, +} + +#[derive(Debug, Deserialize)] +struct FilesetResponse { + code: u32, + fileset: Fileset, +} + +#[derive(Debug, Deserialize)] +struct FileLocationResponse { + code: u32, + #[serde(rename = "fileLocation")] + location: String, +} + +pub(crate) struct GravitinoClient { + gravitino_uri: String, + metalake: String, + + client: Client, +} + +impl GravitinoClient { + pub fn new(config: &GravitinoConfig) -> Self { + Self { + gravitino_uri: config.uri.clone(), + metalake: config.metalake.clone(), + client: Client::new(), + } + } + + pub fn init(&self) {} + + pub fn do_post(&self, _path: &str, _data: &str) { + todo!() + } + + pub fn request(&self, _path: &str, _data: &str) -> Result<(), GvfsError> { + todo!() + } + + pub fn list_schema(&self) -> Result<(), GvfsError> { + todo!() + } + + pub fn list_fileset(&self) -> Result<(), GvfsError> { + todo!() + } + + fn get_fileset_url(&self, catalog_name: &str, schema_name: &str, fileset_name: &str) -> String { + format!( + "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}", + self.gravitino_uri, self.metalake, catalog_name, schema_name, fileset_name + ) + } + + async fn do_get(&self, url: &str) -> Result + where + T: for<'de> Deserialize<'de>, + { + let http_resp = + self.client.get(url).send().await.map_err(|e| { + GvfsError::RestError(format!("Failed to send request to {}", url), e) + })?; + + let res = http_resp.json::().await.map_err(|e| { + GvfsError::RestError(format!("Failed to parse response from {}", url), e) + })?; + + Ok(res) + } + + pub async fn get_fileset( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + ) -> Result { + let url = self.get_fileset_url(catalog_name, schema_name, fileset_name); + let res = self.do_get::(&url).await?; + + if res.code != 0 { + return Err(GvfsError::Error( + ErrorCode::GravitinoClientError, + "Failed to get fileset".to_string(), + )); + } + Ok(res.fileset) + } + + pub fn get_file_location_url( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + path: &str, + ) -> String { + let encoded_path = encode(path); + format!( + "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}", + self.gravitino_uri, + self.metalake, + catalog_name, + schema_name, + fileset_name, + encoded_path + ) + } + + pub async fn get_file_location( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + path: &str, + ) -> Result { + let url = self.get_file_location_url(catalog_name, schema_name, fileset_name, path); + let res = self.do_get::(&url).await?; + + if res.code != 0 { + return Err(GvfsError::Error( + ErrorCode::GravitinoClientError, + "Failed to get file location".to_string(), + )); + } + Ok(res.location) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::mock; + + #[tokio::test] + async fn test_get_fileset_success() { + let fileset_response = r#" + { + "code": 0, + "fileset": { + "name": "example_fileset", + "type": "example_type", + "comment": "This is a test fileset", + "storageLocation": "/example/path", + "properties": { + "key1": "value1", + "key2": "value2" + } + } + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!( + "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}", + "test", "catalog1", "schema1", "fileset1" + ); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(fileset_response) + .create(); + + let config = GravitinoConfig { + uri: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client.get_fileset("catalog1", "schema1", "fileset1").await; + + match result { + Ok(fileset) => { + assert_eq!(fileset.name, "example_fileset"); + assert_eq!(fileset.fileset_type, "example_type"); + assert_eq!(fileset.storage_location, "/example/path"); + assert_eq!(fileset.properties.get("key1"), Some(&"value1".to_string())); + } + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } + + #[tokio::test] + async fn test_get_file_location_success() { + let file_location_response = r#" + { + "code": 0, + "fileLocation": "/mybucket/a" + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!( + "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}", + "test", + "catalog1", + "schema1", + "fileset1", + encode("/example/path") + ); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(file_location_response) + .create(); + + let config = GravitinoConfig { + uri: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client + .get_file_location("catalog1", "schema1", "fileset1", "/example/path") + .await; + + match result { + Ok(location) => { + assert_eq!(location, "/mybucket/a"); + } + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } + + async fn get_fileset_example() { + tracing_subscriber::fmt::init(); + let config = GravitinoConfig { + uri: "http://localhost:8090".to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + client.init(); + let result = client.get_fileset("c1", "s1", "fileset1").await; + if let Err(e) = &result { + println!("{:?}", e); + } + + let fileset = result.unwrap(); + println!("{:?}", fileset); + assert_eq!(fileset.name, "fileset1"); + } +} diff --git a/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs new file mode 100644 index 00000000000..98a295dbb87 --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::AppConfig; +use crate::filesystem::{FileStat, FileSystemCapacity, FileSystemContext, PathFileSystem, Result}; +use crate::gravitino_client::GravitinoClient; +use crate::opened_file::{OpenFileFlags, OpenedFile}; +use async_trait::async_trait; +use fuse3::Errno; +use std::path::{Path, PathBuf}; + +/// GravitinoFileSystem is a filesystem that is associated with a fileset in Gravitino. +/// It mapping the fileset path to the original data storage path. and delegate the operation +/// to the inner filesystem like S3 GCS, JuiceFS. +pub(crate) struct GravitinoFilesetFileSystem { + physical_fs: Box, + client: GravitinoClient, + fileset_location: PathBuf, +} + +impl GravitinoFilesetFileSystem { + pub async fn new( + fs: Box, + location: &Path, + client: GravitinoClient, + _config: &AppConfig, + _context: &FileSystemContext, + ) -> Self { + Self { + physical_fs: fs, + client: client, + fileset_location: location.into(), + } + } + + fn gvfs_path_to_raw_path(&self, path: &Path) -> PathBuf { + self.fileset_location.join(path) + } + + fn raw_path_to_gvfs_path(&self, path: &Path) -> Result { + path.strip_prefix(&self.fileset_location) + .map_err(|_| Errno::from(libc::EBADF))?; + Ok(path.into()) + } +} + +#[async_trait] +impl PathFileSystem for GravitinoFilesetFileSystem { + async fn init(&self) -> Result<()> { + self.physical_fs.init().await + } + + async fn stat(&self, path: &Path) -> Result { + let raw_path = self.gvfs_path_to_raw_path(path); + let mut file_stat = self.physical_fs.stat(&raw_path).await?; + file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?; + Ok(file_stat) + } + + async fn read_dir(&self, path: &Path) -> Result> { + let raw_path = self.gvfs_path_to_raw_path(path); + let mut child_filestats = self.physical_fs.read_dir(&raw_path).await?; + for file_stat in child_filestats.iter_mut() { + file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?; + } + Ok(child_filestats) + } + + async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + let raw_path = self.gvfs_path_to_raw_path(path); + let mut opened_file = self.physical_fs.open_file(&raw_path, flags).await?; + opened_file.file_stat.path = self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?; + Ok(opened_file) + } + + async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> Result { + let raw_path = self.gvfs_path_to_raw_path(path); + let mut opened_file = self.physical_fs.open_dir(&raw_path, flags).await?; + opened_file.file_stat.path = self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?; + Ok(opened_file) + } + + async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + let raw_path = self.gvfs_path_to_raw_path(path); + let mut opened_file = self.physical_fs.create_file(&raw_path, flags).await?; + opened_file.file_stat.path = self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?; + Ok(opened_file) + } + + async fn create_dir(&self, path: &Path) -> Result { + let raw_path = self.gvfs_path_to_raw_path(path); + let mut file_stat = self.physical_fs.create_dir(&raw_path).await?; + file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?; + Ok(file_stat) + } + + async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()> { + let raw_path = self.gvfs_path_to_raw_path(path); + self.physical_fs.set_attr(&raw_path, file_stat, flush).await + } + + async fn remove_file(&self, path: &Path) -> Result<()> { + let raw_path = self.gvfs_path_to_raw_path(path); + self.physical_fs.remove_file(&raw_path).await + } + + async fn remove_dir(&self, path: &Path) -> Result<()> { + let raw_path = self.gvfs_path_to_raw_path(path); + self.physical_fs.remove_dir(&raw_path).await + } + + fn get_capacity(&self) -> Result { + self.physical_fs.get_capacity() + } +} diff --git a/clients/filesystem-fuse/src/gvfs_fuse.rs b/clients/filesystem-fuse/src/gvfs_fuse.rs new file mode 100644 index 00000000000..d472895d2b3 --- /dev/null +++ b/clients/filesystem-fuse/src/gvfs_fuse.rs @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::AppConfig; +use crate::default_raw_filesystem::DefaultRawFileSystem; +use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem}; +use crate::filesystem::FileSystemContext; +use crate::fuse_api_handle::FuseApiHandle; +use crate::fuse_server::FuseServer; +use crate::gravitino_client::GravitinoClient; +use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem; +use crate::memory_filesystem::MemoryFileSystem; +use crate::utils::GvfsResult; +use log::info; +use once_cell::sync::Lazy; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::Mutex; + +const FILESET_PREFIX: &str = "gvfs://fileset/"; + +static SERVER: Lazy>>> = Lazy::new(|| Mutex::new(None)); + +pub(crate) enum CreateFileSystemResult { + Memory(MemoryFileSystem), + Gvfs(GravitinoFilesetFileSystem), + FuseMemoryFs(FuseApiHandle>), + FuseGvfs(FuseApiHandle>), + None, +} + +pub enum FileSystemSchema { + S3, +} + +pub async fn mount(mount_to: &str, mount_from: &str, config: &AppConfig) -> GvfsResult<()> { + info!("Starting gvfs-fuse server..."); + let svr = Arc::new(FuseServer::new(mount_to)); + { + let mut server = SERVER.lock().await; + *server = Some(svr.clone()); + } + let fs = create_fuse_fs(mount_from, config).await?; + match fs { + CreateFileSystemResult::FuseMemoryFs(vfs) => svr.start(vfs).await?, + CreateFileSystemResult::FuseGvfs(vfs) => svr.start(vfs).await?, + _ => return Err(UnSupportedFilesystem.to_error("Unsupported filesystem type".to_string())), + } + Ok(()) +} + +pub async fn unmount() -> GvfsResult<()> { + info!("Stop gvfs-fuse server..."); + let svr = { + let mut server = SERVER.lock().await; + if server.is_none() { + info!("Server is already stopped."); + return Ok(()); + } + server.take().unwrap() + }; + svr.stop().await +} + +pub(crate) async fn create_fuse_fs( + mount_from: &str, + config: &AppConfig, +) -> GvfsResult { + let uid = unsafe { libc::getuid() }; + let gid = unsafe { libc::getgid() }; + let fs_context = FileSystemContext::new(uid, gid, config); + let fs = create_path_fs(mount_from, config, &fs_context).await?; + create_raw_fs(fs, config, fs_context).await +} + +pub async fn create_raw_fs( + path_fs: CreateFileSystemResult, + config: &AppConfig, + fs_context: FileSystemContext, +) -> GvfsResult { + match path_fs { + CreateFileSystemResult::Memory(fs) => { + let fs = FuseApiHandle::new( + DefaultRawFileSystem::new(fs, config, &fs_context), + config, + fs_context, + ); + Ok(CreateFileSystemResult::FuseMemoryFs(fs)) + } + CreateFileSystemResult::Gvfs(fs) => { + let fs = FuseApiHandle::new( + DefaultRawFileSystem::new(fs, config, &fs_context), + config, + fs_context, + ); + Ok(CreateFileSystemResult::FuseGvfs(fs)) + } + _ => Err(UnSupportedFilesystem.to_error("Unsupported filesystem type".to_string())), + } +} + +pub async fn create_path_fs( + mount_from: &str, + config: &AppConfig, + fs_context: &FileSystemContext, +) -> GvfsResult { + if config.fuse.fs_type == "memory" { + Ok(CreateFileSystemResult::Memory( + MemoryFileSystem::new().await, + )) + } else { + create_gvfs_filesystem(mount_from, config, fs_context).await + } +} + +pub async fn create_gvfs_filesystem( + mount_from: &str, + config: &AppConfig, + fs_context: &FileSystemContext, +) -> GvfsResult { + // Gvfs-fuse filesystem structure: + // FuseApiHandle + // ├─ DefaultRawFileSystem (RawFileSystem) + // │ └─ FileSystemLog (PathFileSystem) + // │ ├─ GravitinoComposedFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ S3FileSystem (PathFileSystem) + // │ │ │ └─ OpenDALFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ HDFSFileSystem (PathFileSystem) + // │ │ │ └─ OpenDALFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ JuiceFileSystem (PathFileSystem) + // │ │ │ └─ NasFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ XXXFileSystem (PathFileSystem) + // + // `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs. + // It manages file and directory relationships, as well as file mappings. + // It delegates file operations to the PathFileSystem + // + // `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs. + // Similar implementations include permissions, caching, and metrics. + // + // `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`. + // It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage. + // If the user only mounts a fileset, this layer is not present. There will only be one below layer. + // + // `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path. + // and delegate the operation to the real storage. + // + // `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage. + // it can assess the S3, HDFS, gcs, azblob and other storage. + // + // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage. + // + // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage. + // + // `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS. + // + // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage. + // + // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. + + let client = GravitinoClient::new(&config.gravitino); + + let (catalog, schema, fileset) = extract_fileset(mount_from)?; + let location = client + .get_fileset(&catalog, &schema, &fileset) + .await? + .storage_location; + let (_schema, location) = extract_storage_filesystem(&location).unwrap(); + + // todo need to replace the inner filesystem with the real storage filesystem + let inner_fs = MemoryFileSystem::new().await; + + let fs = GravitinoFilesetFileSystem::new( + Box::new(inner_fs), + Path::new(&location), + client, + config, + fs_context, + ) + .await; + Ok(CreateFileSystemResult::Gvfs(fs)) +} + +pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> { + if !path.starts_with(FILESET_PREFIX) { + return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); + } + + let path_without_prefix = &path[FILESET_PREFIX.len()..]; + + let parts: Vec<&str> = path_without_prefix.split('/').collect(); + + if parts.len() != 3 { + return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); + } + // todo handle mount catalog or schema + + let catalog = parts[1].to_string(); + let schema = parts[2].to_string(); + let fileset = parts[3].to_string(); + + Ok((catalog, schema, fileset)) +} + +pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema, String)> { + // todo need to improve the logic + if let Some(pos) = path.find("://") { + let protocol = &path[..pos]; + let location = &path[pos + 3..]; + let location = match location.find('/') { + Some(index) => &location[index + 1..], + None => "", + }; + let location = match location.ends_with('/') { + true => location.to_string(), + false => format!("{}/", location), + }; + + match protocol { + "s3" => Some((FileSystemSchema::S3, location.to_string())), + "s3a" => Some((FileSystemSchema::S3, location.to_string())), + _ => None, + } + } else { + None + } +} diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs index 36e8c28d343..5532d619e5c 100644 --- a/clients/filesystem-fuse/src/lib.rs +++ b/clients/filesystem-fuse/src/lib.rs @@ -16,20 +16,27 @@ * specific language governing permissions and limitations * under the License. */ +use crate::config::AppConfig; +use crate::utils::GvfsResult; + +pub mod config; mod default_raw_filesystem; +mod error; mod filesystem; mod fuse_api_handle; mod fuse_server; +mod gravitino_client; +mod gravitino_fileset_filesystem; +mod gvfs_fuse; mod memory_filesystem; -mod mount; mod opened_file; mod opened_file_manager; mod utils; -pub async fn gvfs_mount(mount_point: &str) -> fuse3::Result<()> { - mount::mount(mount_point).await +pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &AppConfig) -> GvfsResult<()> { + gvfs_fuse::mount(mount_to, mount_from, config).await } -pub async fn gvfs_unmount() { - mount::unmount().await; +pub async fn gvfs_unmount() -> GvfsResult<()> { + gvfs_fuse::unmount().await } diff --git a/clients/filesystem-fuse/src/main.rs b/clients/filesystem-fuse/src/main.rs index 28866a9bb1c..8eab5ec0d51 100644 --- a/clients/filesystem-fuse/src/main.rs +++ b/clients/filesystem-fuse/src/main.rs @@ -16,18 +16,33 @@ * specific language governing permissions and limitations * under the License. */ +use fuse3::Errno; +use gvfs_fuse::config::AppConfig; use gvfs_fuse::{gvfs_mount, gvfs_unmount}; -use log::info; +use log::{error, info}; use tokio::signal; #[tokio::main] async fn main() -> fuse3::Result<()> { tracing_subscriber::fmt().init(); - tokio::spawn(async { gvfs_mount("gvfs").await }); + + //todo(read config file from args) + let config = AppConfig::from_file(Some("conf/gvfs_fuse.toml")); + if let Err(e) = &config { + error!("Failed to load config: {:?}", e); + return Err(Errno::from(libc::EINVAL)); + } + let config = config.unwrap(); + let handle = tokio::spawn(async move { gvfs_mount("gvfs", "", &config).await }); let _ = signal::ctrl_c().await; info!("Received Ctrl+C, Unmounting gvfs..."); - gvfs_unmount().await; + if let Err(e) = handle.await { + error!("Failed to mount gvfs: {:?}", e); + return Err(Errno::from(libc::EINVAL)); + } + + let _ = gvfs_unmount().await; Ok(()) } diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs b/clients/filesystem-fuse/src/memory_filesystem.rs index ca3f13fd9a6..b94d16b8d39 100644 --- a/clients/filesystem-fuse/src/memory_filesystem.rs +++ b/clients/filesystem-fuse/src/memory_filesystem.rs @@ -16,7 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -use crate::filesystem::{FileReader, FileStat, FileWriter, PathFileSystem, Result}; +use crate::filesystem::{ + FileReader, FileStat, FileSystemCapacity, FileWriter, PathFileSystem, Result, +}; use crate::opened_file::{OpenFileFlags, OpenedFile}; use async_trait::async_trait; use bytes::Bytes; @@ -193,6 +195,10 @@ impl PathFileSystem for MemoryFileSystem { } Ok(()) } + + fn get_capacity(&self) -> Result { + Ok(FileSystemCapacity {}) + } } pub(crate) struct MemoryFileReader { diff --git a/clients/filesystem-fuse/src/mount.rs b/clients/filesystem-fuse/src/mount.rs deleted file mode 100644 index 102e2401643..00000000000 --- a/clients/filesystem-fuse/src/mount.rs +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -use crate::default_raw_filesystem::DefaultRawFileSystem; -use crate::filesystem::FileSystemContext; -use crate::fuse_api_handle::FuseApiHandle; -use crate::fuse_server::FuseServer; -use crate::memory_filesystem::MemoryFileSystem; -use fuse3::raw::Filesystem; -use log::info; -use once_cell::sync::Lazy; -use std::sync::Arc; -use tokio::sync::Mutex; - -static SERVER: Lazy>>> = Lazy::new(|| Mutex::new(None)); - -pub async fn mount(mount_point: &str) -> fuse3::Result<()> { - info!("Starting gvfs-fuse server..."); - let svr = Arc::new(FuseServer::new(mount_point)); - { - let mut server = SERVER.lock().await; - *server = Some(svr.clone()); - } - let fs = create_fuse_fs().await; - svr.start(fs).await -} - -pub async fn unmount() { - info!("Stop gvfs-fuse server..."); - let svr = { - let mut server = SERVER.lock().await; - if server.is_none() { - info!("Server is already stopped."); - return; - } - server.take().unwrap() - }; - let _ = svr.stop().await; -} - -pub async fn create_fuse_fs() -> impl Filesystem + Sync + 'static { - let uid = unsafe { libc::getuid() }; - let gid = unsafe { libc::getgid() }; - let fs_context = FileSystemContext { - uid: uid, - gid: gid, - default_file_perm: 0o644, - default_dir_perm: 0o755, - block_size: 4 * 1024, - }; - - let gvfs = MemoryFileSystem::new().await; - let fs = DefaultRawFileSystem::new(gvfs); - FuseApiHandle::new(fs, fs_context) -} - -pub async fn create_gvfs_filesystem() { - // Gvfs-fuse filesystem structure: - // FuseApiHandle - // ├─ DefaultRawFileSystem (RawFileSystem) - // │ └─ FileSystemLog (PathFileSystem) - // │ ├─ GravitinoComposedFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ S3FileSystem (PathFileSystem) - // │ │ │ └─ OpenDALFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ HDFSFileSystem (PathFileSystem) - // │ │ │ └─ OpenDALFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ JuiceFileSystem (PathFileSystem) - // │ │ │ └─ NasFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ XXXFileSystem (PathFileSystem) - // - // `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs. - // It manages file and directory relationships, as well as file mappings. - // It delegates file operations to the PathFileSystem - // - // `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs. - // Similar implementations include permissions, caching, and metrics. - // - // `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`. - // It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage. - // If the user only mounts a fileset, this layer is not present. There will only be one below layer. - // - // `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path. - // and delegate the operation to the real storage. - // - // `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage. - // it can assess the S3, HDFS, gcs, azblob and other storage. - // - // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage. - // - // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage. - // - // `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS. - // - // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage. - // - // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. - - todo!("Implement the createGvfsFuseFileSystem function"); -} diff --git a/clients/filesystem-fuse/src/utils.rs b/clients/filesystem-fuse/src/utils.rs index 21e52f86af8..bbc8d7d7f8a 100644 --- a/clients/filesystem-fuse/src/utils.rs +++ b/clients/filesystem-fuse/src/utils.rs @@ -16,6 +16,9 @@ * specific language governing permissions and limitations * under the License. */ +use crate::error::GvfsError; + +pub type GvfsResult = Result; #[cfg(test)] mod tests {} diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml new file mode 100644 index 00000000000..013df6cfc31 --- /dev/null +++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_memory.toml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# fuse settings +[fuse] +file_mask= 0o600 +dir_mask= 0o700 +fs_type = "memory" + +[fuse.properties] +key1 = "value1" +key2 = "value2" + +# filesystem settings +[filesystem] +block_size = 8192 + +# Gravitino settings +[gravitino] +uri = "http://localhost:8090" +metalake = "test" + +# extent settings +[extent_config] +access_key = "XXX_access_key" +secret_key = "XXX_secret_key" diff --git a/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml b/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml new file mode 100644 index 00000000000..ff7c6936f37 --- /dev/null +++ b/clients/filesystem-fuse/tests/conf/gvfs_fuse_test.toml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# fuse settings +[fuse] +file_mask= 0o644 +dir_mask= 0o755 +fs_type = "memory" + +[fuse.properties] +key1 = "value1" +key2 = "value2" + +# filesystem settings +[filesystem] +block_size = 8192 + +# Gravitino settings +[gravitino] +uri = "http://localhost:8090" +metalake = "test" + +# extent settings +[extend_config] +access_key = "XXX_access_key" +secret_key = "XXX_secret_key" diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs index 23aafbaf6e4..e761fabc5b6 100644 --- a/clients/filesystem-fuse/tests/fuse_test.rs +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -17,6 +17,7 @@ * under the License. */ +use gvfs_fuse::config::AppConfig; use gvfs_fuse::{gvfs_mount, gvfs_unmount}; use log::info; use std::fs; @@ -38,15 +39,18 @@ impl FuseTest { pub fn setup(&mut self) { info!("Start gvfs fuse server"); let mount_point = self.mount_point.clone(); + + let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml")) + .expect("Failed to load config"); self.runtime - .spawn(async move { gvfs_mount(&mount_point).await }); + .spawn(async move { gvfs_mount(&mount_point, "", &config).await }); let success = Self::wait_for_fuse_server_ready(&self.mount_point, Duration::from_secs(15)); assert!(success, "Fuse server cannot start up at 15 seconds"); } pub fn shutdown(&mut self) { self.runtime.block_on(async { - gvfs_unmount().await; + let _ = gvfs_unmount().await; }); } @@ -76,7 +80,7 @@ impl Drop for FuseTest { fn test_fuse_system_with_auto() { tracing_subscriber::fmt().init(); - let mount_point = "build/gvfs"; + let mount_point = "target/gvfs"; let _ = fs::create_dir_all(mount_point); let mut test = FuseTest {