diff --git a/clients/filesystem-fuse/Cargo.toml b/clients/filesystem-fuse/Cargo.toml index 75a4dd71301..be543ebe337 100644 --- a/clients/filesystem-fuse/Cargo.toml +++ b/clients/filesystem-fuse/Cargo.toml @@ -43,3 +43,10 @@ log = "0.4.22" once_cell = "1.20.2" tokio = { version = "1.38.0", features = ["full"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +serde = { version = "1.0.216", features = ["derive"] } +toml = "0.8.19" +reqwest = { version = "0.12.9", features = ["json"] } +urlencoding = "2.1.3" + +[dev-dependencies] +mockito = "0.31" diff --git a/clients/filesystem-fuse/conf/gvfs_test.toml b/clients/filesystem-fuse/conf/gvfs_test.toml new file mode 100644 index 00000000000..df7779322e3 --- /dev/null +++ b/clients/filesystem-fuse/conf/gvfs_test.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# fuse settings +[fuse] +default_mask = 0o600 + +[fuse.properties] +key1 = "value1" +key2 = "value2" + +# filesystem settings +[filesystem] +block_size = 8192 + +# Gravitino settings +[gravitino] +gravitino_url = "http://localhost:8090" +metalake = "test" + +# extent settings +[extent_config] +access_key = "XXX_access_key" +secret_key = "XXX_secret_key" diff --git a/clients/filesystem-fuse/src/config.rs b/clients/filesystem-fuse/src/config.rs new file mode 100644 index 00000000000..6e64627aa7d --- /dev/null +++ b/clients/filesystem-fuse/src/config.rs @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use serde::Deserialize; +use std::collections::HashMap; + +#[derive(Debug, Deserialize)] +pub struct Config { + pub fuse: FuseConfig, + pub filesystem: FilesystemConfig, + pub gravitino: GravitinoConfig, + pub extent_config: HashMap, +} + +impl Config { + pub fn from_file(file: &str) -> Config { + let config_content = std::fs::read_to_string(file).unwrap(); + let config = toml::from_str::(&config_content).unwrap(); + config + } + + pub fn default() -> Config { + Config { + fuse: FuseConfig { + default_mask: 0o600, + fs_type: "memory".to_string(), + properties: HashMap::new(), + }, + filesystem: FilesystemConfig { block_size: 4096 }, + gravitino: GravitinoConfig { + gravitino_url: "http://localhost:8090".to_string(), + metalake: "test".to_string(), + }, + extent_config: HashMap::new(), + } + } +} + +#[derive(Debug, Deserialize)] +pub struct FuseConfig { + pub default_mask: u32, + pub fs_type: String, + pub properties: HashMap, +} + +#[derive(Debug, Deserialize)] +pub struct FilesystemConfig { + pub block_size: u32, +} + +#[derive(Debug, Deserialize)] +pub struct GravitinoConfig { + pub gravitino_url: String, + pub metalake: String, +} + +#[cfg(test)] +mod test { + use crate::config::Config; + + #[test] + fn test_config_from_file() { + let config = Config::from_file("conf/gvfs_test.toml"); + assert_eq!(config.fuse.default_mask, 0o600); + assert_eq!(config.filesystem.block_size, 8192); + assert_eq!(config.gravitino.gravitino_url, "http://localhost:8090"); + assert_eq!(config.gravitino.metalake, "test"); + assert_eq!(config.extent_config.get("access_key"), Some(&"XXX_access_key".to_string())); + assert_eq!(config.extent_config.get("secret_key"), Some(&"XXX_secret_key".to_string())); + } +} diff --git a/clients/filesystem-fuse/src/default_raw_filesystem.rs b/clients/filesystem-fuse/src/default_raw_filesystem.rs index 0ab92e91640..efb436f413f 100644 --- a/clients/filesystem-fuse/src/default_raw_filesystem.rs +++ b/clients/filesystem-fuse/src/default_raw_filesystem.rs @@ -16,10 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -use crate::filesystem::{ - FileStat, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, ROOT_DIR_FILE_ID, - ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH, -}; +use crate::filesystem::{FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH}; use crate::opened_file::{FileHandle, OpenFileFlags}; use crate::opened_file_manager::OpenedFileManager; use async_trait::async_trait; @@ -30,6 +27,7 @@ use std::ffi::OsStr; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicU64; use tokio::sync::RwLock; +use crate::config::Config; /// DefaultRawFileSystem is a simple implementation for the file system. /// it is used to manage the file metadata and file handle. @@ -47,7 +45,7 @@ pub struct DefaultRawFileSystem { } impl DefaultRawFileSystem { - pub(crate) fn new(fs: T) -> Self { + pub(crate) fn new(fs: T, _config: &Config, _fs_context: &FileSystemContext) -> Self { Self { file_entry_manager: RwLock::new(FileEntryManager::new()), opened_file_manager: OpenedFileManager::new(), diff --git a/clients/filesystem-fuse/src/error.rs b/clients/filesystem-fuse/src/error.rs new file mode 100644 index 00000000000..707f3119848 --- /dev/null +++ b/clients/filesystem-fuse/src/error.rs @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use fuse3::Errno; + +#[derive(Debug)] +pub enum ErrorCode { + UnSupportedFilesystem, + GravitinoClientError, +} + +impl ErrorCode { + pub fn to_string(&self) -> String { + match self { + ErrorCode::UnSupportedFilesystem => "The filesystem is not supported".to_string(), + _ => "".to_string(), + } + } + pub fn to_error(self, message: impl Into) -> GvfsError { + GvfsError::Error(self, message.into()) + } +} + +#[derive(Debug)] +pub enum GvfsError { + RestError(String, reqwest::Error), + Error(ErrorCode, String), + Errno(Errno), + IOError(std::io::Error), +} +impl From for GvfsError { + fn from(err: reqwest::Error) -> Self { + GvfsError::RestError("Http request failed:".to_owned() + &err.to_string(), err) + } +} + +impl From for GvfsError { + fn from(errno: Errno) -> Self { + GvfsError::Errno(errno) + } +} + +impl From for GvfsError { + fn from(err: std::io::Error) -> Self { + GvfsError::IOError(err) + } +} \ No newline at end of file diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs index d9440b0e652..84c1c970620 100644 --- a/clients/filesystem-fuse/src/filesystem.rs +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -121,6 +121,7 @@ pub(crate) trait PathFileSystem: Send + Sync { /// Create the directory by file path , if successful, return the file stat async fn create_dir(&self, path: &Path) -> Result; + /// Set the file attribute by file path and file stat async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()>; @@ -129,6 +130,8 @@ pub(crate) trait PathFileSystem: Send + Sync { /// Remove the directory by file path async fn remove_dir(&self, path: &Path) -> Result<()>; + + fn get_capacity(&self) -> Result; } // FileSystemContext is the system environment for the fuse file system. @@ -161,6 +164,9 @@ impl FileSystemContext { } } +// capacity of the file system +pub struct FileSystemCapacity {} + // FileStat is the file metadata of the file #[derive(Clone, Debug)] pub struct FileStat { diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs b/clients/filesystem-fuse/src/fuse_api_handle.rs index 1f24e94ee86..d5c4e4683fe 100644 --- a/clients/filesystem-fuse/src/fuse_api_handle.rs +++ b/clients/filesystem-fuse/src/fuse_api_handle.rs @@ -33,6 +33,7 @@ use futures_util::StreamExt; use std::ffi::{OsStr, OsString}; use std::num::NonZeroU32; use std::time::{Duration, SystemTime}; +use crate::config::Config; pub(crate) struct FuseApiHandle { fs: T, @@ -44,7 +45,7 @@ impl FuseApiHandle { const DEFAULT_ATTR_TTL: Duration = Duration::from_secs(1); const DEFAULT_MAX_WRITE_SIZE: u32 = 16 * 1024; - pub fn new(fs: T, context: FileSystemContext) -> Self { + pub fn new(fs: T, _config: &Config, context: FileSystemContext) -> Self { Self { fs: fs, default_ttl: Self::DEFAULT_ATTR_TTL, diff --git a/clients/filesystem-fuse/src/fuse_server.rs b/clients/filesystem-fuse/src/fuse_server.rs index dae7c28a631..962f09a5f94 100644 --- a/clients/filesystem-fuse/src/fuse_server.rs +++ b/clients/filesystem-fuse/src/fuse_server.rs @@ -17,12 +17,13 @@ * under the License. */ use fuse3::raw::{Filesystem, Session}; -use fuse3::{MountOptions, Result}; +use fuse3::{MountOptions}; use log::{error, info}; use std::process::exit; use std::sync::Arc; use tokio::select; use tokio::sync::Notify; +use crate::utils::GvfsResult; /// Represents a FUSE server capable of starting and stopping the FUSE filesystem. pub struct FuseServer { @@ -43,7 +44,7 @@ impl FuseServer { } /// Starts the FUSE filesystem and blocks until it is stopped. - pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> Result<()> { + pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> GvfsResult<()> { //check if the mount point exists if !std::path::Path::new(&self.mount_point).exists() { error!("Mount point {} does not exist", self.mount_point); @@ -83,11 +84,12 @@ impl FuseServer { } /// Stops the FUSE filesystem. - pub async fn stop(&self) { + pub async fn stop(&self) -> GvfsResult<()>{ info!("Stopping FUSE filesystem..."); self.close_notify.notify_one(); // wait for the filesystem to stop self.close_notify.notified().await; + Ok(()) } } diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs new file mode 100644 index 00000000000..be816dfdd6c --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_client.rs @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::GravitinoConfig; +use crate::error::{ErrorCode, GvfsError}; +use reqwest::Client; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt::{Debug}; +use urlencoding::encode; + +#[derive(Debug, Deserialize)] +pub(crate) struct Fileset { + pub(crate) name: String, + #[serde(rename = "type")] + pub(crate) fileset_type: String, + comment: String, + #[serde(rename = "storageLocation")] + pub(crate) storage_location: String, + properties: HashMap, +} + +#[derive(Debug, Deserialize)] +struct FilesetResponse { + code: i32, + fileset: Fileset, +} + +#[derive(Debug, Deserialize)] +struct FileLocationResponse { + code: i32, + #[serde(rename = "fileLocation")] + location: String, +} + +pub(crate) struct GravitinoClient { + gravitino_uri: String, + metalake: String, + + http_client: Client, +} + +impl GravitinoClient { + pub fn new(config: &GravitinoConfig) -> Self { + Self { + gravitino_uri: config.gravitino_url.clone(), + metalake: config.metalake.clone(), + http_client: Client::new(), + } + } + + pub fn init(&self) {} + + pub fn do_post(&self, path: &str, data: &str) { + println!("POST request to {} with data: {}", path, data); + } + + pub fn do_get(&self, path: &str) { + println!("GET request to {}", path); + } + + pub fn request(&self, path: &str, _data: &str) -> Result<(), GvfsError> { + todo!() + } + + pub fn list_schema(&self) -> Result<(), GvfsError> { + todo!() + } + + pub fn list_fileset(&self) -> Result<(), GvfsError> { + todo!() + } + + fn get_fileset_url(&self, catalog_name: &str, schema_name: &str, fileset_name: &str) -> String { + format!( + "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}", + self.gravitino_uri, self.metalake, catalog_name, schema_name, fileset_name + ) + } + + async fn send_and_parse(&self, url: &str) -> Result + where + T: for<'de> Deserialize<'de>, + { + let http_resp = self.http_client.get(url).send().await.map_err(|e| { + GvfsError::RestError(format!("Failed to send request to {}", url), e) + })?; + + let res = http_resp.json::().await.map_err(|e| { + GvfsError::RestError(format!("Failed to parse response from {}", url), e) + })?; + + Ok(res) + } + + pub async fn get_fileset( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + ) -> Result { + let url = self.get_fileset_url(catalog_name, schema_name, fileset_name); + let res = self.send_and_parse::(&url).await?; + + if res.code != 0 { + return Err(GvfsError::Error( + ErrorCode::GravitinoClientError, + "Failed to get fileset".to_string(), + )); + } + Ok(res.fileset) + } + + pub fn get_file_location_url( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + path: &str, + ) -> String { + let encoded_path = encode(path); + format!( + "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}", + self.gravitino_uri, self.metalake, catalog_name, schema_name, fileset_name, encoded_path + ) + } + + pub async fn get_file_location( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + path: &str, + ) -> Result { + let url = self.get_file_location_url(catalog_name, schema_name, fileset_name, path); + let res = self.send_and_parse::(&url).await?; + + if res.code != 0 { + return Err(GvfsError::Error( + ErrorCode::GravitinoClientError, + "Failed to get file location".to_string(), + )); + } + Ok(res.location) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::mock; + use tokio; + + #[tokio::test] + async fn test_get_fileset_success() { + tracing_subscriber::fmt::init(); + let fileset_response = r#" + { + "code": 0, + "fileset": { + "name": "example_fileset", + "type": "example_type", + "comment": "This is a test fileset", + "storageLocation": "/example/path", + "properties": { + "key1": "value1", + "key2": "value2" + } + } + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!( + "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}", + "test", "catalog1", "schema1", "fileset1" + ); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(fileset_response) + .create(); + + let config = GravitinoConfig { + gravitino_url: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client.get_fileset("catalog1", "schema1", "fileset1").await; + + match result { + Ok(fileset) => { + assert_eq!(fileset.name, "example_fileset"); + assert_eq!(fileset.fileset_type, "example_type"); + assert_eq!(fileset.storage_location, "/example/path"); + assert_eq!(fileset.properties.get("key1"), Some(&"value1".to_string())); + } + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } + + #[tokio::test] + async fn test_get_file_location_success() { + tracing_subscriber::fmt::init(); + let file_location_response = r#" + { + "code": 0, + "fileLocation": "/mybucket/a" + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!( + "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}", + "test", "catalog1", "schema1", "fileset1", "/example/path" + ); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(file_location_response) + .create(); + + let config = GravitinoConfig { + gravitino_url: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client + .get_file_location("catalog1", "schema1", "fileset1", "/example/path") + .await; + + match result { + Ok(location) => { + assert_eq!(location, "/mybucket/a"); + } + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } + + #[tokio::test] + async fn test1() { + tracing_subscriber::fmt::init(); + let config = GravitinoConfig { + gravitino_url: "http://localhost:8090".to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + client.init(); + let result= client.get_fileset("c1", "s1", "fileset1").await; + if let Err(e) = &result { + println!("{:?}", e); + } + + let fileset= result.unwrap(); + println!("{:?}", fileset); + assert!(fileset.name == "fileset1"); + } +} diff --git a/clients/filesystem-fuse/src/gvfs_fileset_fs.rs b/clients/filesystem-fuse/src/gvfs_fileset_fs.rs new file mode 100644 index 00000000000..db1208f9838 --- /dev/null +++ b/clients/filesystem-fuse/src/gvfs_fileset_fs.rs @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use std::path::{Path, PathBuf}; +use crate::config::Config; +use crate::filesystem::{ + FileStat, FileSystemCapacity, FileSystemContext, PathFileSystem, + Result, +}; +use crate::gravitino_client::GravitinoClient; +use async_trait::async_trait; +use crate::opened_file::{OpenFileFlags, OpenedFile}; +use crate::storage_filesystem::StorageFileSystem; + +pub(crate) struct GravitinoFileSystemConfig {} + +pub(crate) struct GvfsFilesetFs { + fs: StorageFileSystem, + client: GravitinoClient, + fileset_location: PathBuf, +} + +impl GvfsFilesetFs { + pub async fn new(mount_from: &str, config: &Config, context: &FileSystemContext) -> Self { + todo!("GravitinoFileSystem::new") + } + + fn map_to_raw_path(&self, path: &Path) -> PathBuf{ + if path == Path::new("/") { + return self.fileset_location.clone(); + } + self.fileset_location.join(path) + } +} + +#[async_trait] +impl PathFileSystem for GvfsFilesetFs { + async fn init(&self) -> Result<()> { + self.fs.init().await + } + + async fn stat(&self, path:&Path) -> Result { + let raw_path= self.map_to_raw_path(path); + self.fs.stat(&raw_path).await + } + + async fn read_dir(&self, path:&Path) -> Result> { + let raw_path = self.map_to_raw_path(path); + self.fs.read_dir(&raw_path).await + } + + async fn open_file(&self, path:&Path, flags: OpenFileFlags) -> Result { + let raw_path = self.map_to_raw_path(path); + self.fs.open_file(&raw_path, flags).await + } + + async fn open_dir(&self, path:&Path, flags: OpenFileFlags) -> Result { + let raw_path = self.map_to_raw_path(path); + self.fs.open_dir(&raw_path, flags).await + } + + async fn create_file( + &self, + path: &Path, + flags: OpenFileFlags, + ) -> Result { + let raw_path = self.map_to_raw_path(path); + self.fs.create_file(&raw_path, flags).await + } + + async fn create_dir(&self, path: &Path) -> Result { + let raw_path= self.map_to_raw_path(path); + self.fs.create_dir(&raw_path).await + } + + async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()> { + let raw_path = self.map_to_raw_path(path); + self.fs.set_attr(&raw_path, file_stat, flush).await + } + + async fn remove_file(&self, path: &Path) -> Result<()> { + let raw_path = self.map_to_raw_path(path); + self.fs.remove_file(&raw_path).await + } + + async fn remove_dir(&self, path: &Path) -> Result<()> { + let raw_path = self.map_to_raw_path(path); + self.fs.remove_dir(&raw_path).await + } + + fn get_capacity(&self) -> Result { + self.fs.get_capacity() + } +} diff --git a/clients/filesystem-fuse/src/gvfs_mutiple_fileset_fs.rs b/clients/filesystem-fuse/src/gvfs_mutiple_fileset_fs.rs new file mode 100644 index 00000000000..9bf7931e0e3 --- /dev/null +++ b/clients/filesystem-fuse/src/gvfs_mutiple_fileset_fs.rs @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::{ + FileStat, FileSystemCapacity, PathFileSystem, Result, +}; +use crate::gravitino_client::GravitinoClient; +use crate::opened_file::{OpenFileFlags, OpenedFile}; +use crate::storage_filesystem::StorageFileSystem; +use async_trait::async_trait; +use dashmap::DashMap; +use std::path::Path; + +pub(crate) struct GvfsMultiFilesetFs { + // meta is the metadata of the filesystem + client: GravitinoClient, + inner_fs: DashMap, +} + +impl GvfsMultiFilesetFs { + pub fn new(client: GravitinoClient) -> Self { + Self { + client, + inner_fs: Default::default(), + } + } +} + +#[async_trait] +impl PathFileSystem for GvfsMultiFilesetFs { + async fn init(&self) -> Result<()> { + todo!() + } + + async fn stat(&self, path: &Path) -> Result { + todo!() + } + + async fn read_dir(&self, path: &Path) -> Result> { + todo!() + } + + async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + todo!() + } + + async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> Result { + todo!() + } + + async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + todo!() + } + + async fn create_dir(&self, path: &Path) -> Result { + todo!() + } + + async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()> { + todo!() + } + + async fn remove_file(&self, path: &Path) -> Result<()> { + todo!() + } + + async fn remove_dir(&self, path: &Path) -> Result<()> { + todo!() + } + + fn get_capacity(&self) -> Result { + todo!() + } +} \ No newline at end of file diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs index 36e8c28d343..68d046aaac6 100644 --- a/clients/filesystem-fuse/src/lib.rs +++ b/clients/filesystem-fuse/src/lib.rs @@ -16,6 +16,9 @@ * specific language governing permissions and limitations * under the License. */ +use crate::config::Config; +use crate::utils::GvfsResult; + mod default_raw_filesystem; mod filesystem; mod fuse_api_handle; @@ -25,11 +28,17 @@ mod mount; mod opened_file; mod opened_file_manager; mod utils; +pub mod config; +mod error; +mod gvfs_fileset_fs; +mod gvfs_mutiple_fileset_fs; +mod gravitino_client; +mod storage_filesystem; -pub async fn gvfs_mount(mount_point: &str) -> fuse3::Result<()> { - mount::mount(mount_point).await +pub async fn gvfs_mount(mount_to: &str, mount_from: &str, config: &Config) -> GvfsResult<()> { + mount::mount(mount_to, mount_from, &config).await } -pub async fn gvfs_unmount() { - mount::unmount().await; +pub async fn gvfs_unmount() -> GvfsResult<()>{ + mount::unmount().await } diff --git a/clients/filesystem-fuse/src/main.rs b/clients/filesystem-fuse/src/main.rs index 28866a9bb1c..43708f179fc 100644 --- a/clients/filesystem-fuse/src/main.rs +++ b/clients/filesystem-fuse/src/main.rs @@ -19,15 +19,18 @@ use gvfs_fuse::{gvfs_mount, gvfs_unmount}; use log::info; use tokio::signal; +use gvfs_fuse::config::Config; #[tokio::main] async fn main() -> fuse3::Result<()> { tracing_subscriber::fmt().init(); - tokio::spawn(async { gvfs_mount("gvfs").await }); + + let config = Config::default(); + tokio::spawn(async move { gvfs_mount("gvfs", "", &config).await }); let _ = signal::ctrl_c().await; info!("Received Ctrl+C, Unmounting gvfs..."); - gvfs_unmount().await; + let _= gvfs_unmount().await; Ok(()) } diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs b/clients/filesystem-fuse/src/memory_filesystem.rs index a7a6bf5b416..74176ed3700 100644 --- a/clients/filesystem-fuse/src/memory_filesystem.rs +++ b/clients/filesystem-fuse/src/memory_filesystem.rs @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -use crate::filesystem::{FileReader, FileStat, FileWriter, PathFileSystem, Result}; +use crate::filesystem::{FileReader, FileStat, FileSystemCapacity, FileWriter, PathFileSystem, Result}; use crate::opened_file::{OpenFileFlags, OpenedFile}; use async_trait::async_trait; use bytes::Bytes; @@ -193,6 +193,10 @@ impl PathFileSystem for MemoryFileSystem { } Ok(()) } + + fn get_capacity(&self) -> Result { + Ok(FileSystemCapacity {}) + } } pub(crate) struct MemoryFileReader { diff --git a/clients/filesystem-fuse/src/mount.rs b/clients/filesystem-fuse/src/mount.rs index 102e2401643..86eaed182a8 100644 --- a/clients/filesystem-fuse/src/mount.rs +++ b/clients/filesystem-fuse/src/mount.rs @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ +use std::future::Future; use crate::default_raw_filesystem::DefaultRawFileSystem; -use crate::filesystem::FileSystemContext; +use crate::filesystem::{FileSystemContext, PathFileSystem}; use crate::fuse_api_handle::FuseApiHandle; use crate::fuse_server::FuseServer; use crate::memory_filesystem::MemoryFileSystem; @@ -26,34 +27,55 @@ use log::info; use once_cell::sync::Lazy; use std::sync::Arc; use tokio::sync::Mutex; +use crate::config::Config; +use crate::gvfs_fileset_fs::GvfsFilesetFs; +use crate::mount; +use crate::utils::GvfsResult; static SERVER: Lazy>>> = Lazy::new(|| Mutex::new(None)); -pub async fn mount(mount_point: &str) -> fuse3::Result<()> { +enum CreateFsResult { + Memory(MemoryFileSystem), + Gvfs(GvfsFilesetFs), + FuseMemoryFs(FuseApiHandle>), + FuseGvfs(FuseApiHandle>), + None, +} + +pub async fn mount(mount_to: &str, mount_from: &str, config: &Config) -> GvfsResult<()> { info!("Starting gvfs-fuse server..."); - let svr = Arc::new(FuseServer::new(mount_point)); + let svr = Arc::new(FuseServer::new(mount_to)); { let mut server = SERVER.lock().await; *server = Some(svr.clone()); } - let fs = create_fuse_fs().await; - svr.start(fs).await + let fs = create_fuse_fs(mount_from, config).await; + match fs { + CreateFsResult::FuseMemoryFs(vfs) | + CreateFsResult::FuseGvfs(vfs) => { + svr.start(vfs).await?; + Ok(()) + } + _ => { + Ok(()) + } + } } -pub async fn unmount() { +pub async fn unmount() -> GvfsResult<()> { info!("Stop gvfs-fuse server..."); let svr = { let mut server = SERVER.lock().await; if server.is_none() { info!("Server is already stopped."); - return; + return Ok(()); } server.take().unwrap() }; - let _ = svr.stop().await; + svr.stop().await } -pub async fn create_fuse_fs() -> impl Filesystem + Sync + 'static { +pub async fn create_fuse_fs(mount_from: &str, config: &Config) -> CreateFsResult { let uid = unsafe { libc::getuid() }; let gid = unsafe { libc::getgid() }; let fs_context = FileSystemContext { @@ -64,12 +86,25 @@ pub async fn create_fuse_fs() -> impl Filesystem + Sync + 'static { block_size: 4 * 1024, }; - let gvfs = MemoryFileSystem::new().await; - let fs = DefaultRawFileSystem::new(gvfs); - FuseApiHandle::new(fs, fs_context) + let gvfs = create_gvfs_filesystem(mount_from, config, &fs_context).await; + match gvfs { + CreateFsResult::Memory(fs) => { + let fs = + FuseApiHandle::new(DefaultRawFileSystem::new(fs, config, &fs_context), config, fs_context); + CreateFsResult::FuseMemoryFs(fs) + } + CreateFsResult::Gvfs(fs) => { + let fs = + FuseApiHandle::new(DefaultRawFileSystem::new(fs, config, &fs_context), config, fs_context); + CreateFsResult::FuseGvfs(fs) + } + _ => { + CreateFsResult::None + } + } } -pub async fn create_gvfs_filesystem() { +pub async fn create_gvfs_filesystem(mount_from: &str, config: &Config, fs_context: &FileSystemContext) -> CreateFsResult { // Gvfs-fuse filesystem structure: // FuseApiHandle // ├─ DefaultRawFileSystem (RawFileSystem) @@ -114,5 +149,11 @@ pub async fn create_gvfs_filesystem() { // // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. - todo!("Implement the createGvfsFuseFileSystem function"); + + if (config.fuse.fs_type == "memory") { + CreateFsResult::Memory(MemoryFileSystem::new().await) + } else { + CreateFsResult::Gvfs( + GvfsFilesetFs::new(mount_from, config, &fs_context).await) + } } diff --git a/clients/filesystem-fuse/src/storage_filesystem.rs b/clients/filesystem-fuse/src/storage_filesystem.rs new file mode 100644 index 00000000000..3afa838a24c --- /dev/null +++ b/clients/filesystem-fuse/src/storage_filesystem.rs @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::Config; +use crate::filesystem::{FileStat, FileSystemCapacity, Result}; +use crate::filesystem::{FileSystemContext, PathFileSystem}; +use crate::memory_filesystem::MemoryFileSystem; +use async_trait::async_trait; +use std::fmt; +use std::path::Path; +use crate::opened_file::{OpenFileFlags, OpenedFile}; +use crate::utils::GvfsResult; + +pub enum StorageFileSystemType { + S3, +} + +impl fmt::Display for StorageFileSystemType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + StorageFileSystemType::S3 => write!(f, "s3"), + } + } +} + +pub(crate) enum StorageFileSystem { + MemoryStorage(MemoryFileSystem), +} + +impl StorageFileSystem { + pub(crate) async fn new( + fs_type: &StorageFileSystemType, + config: &Config, + context: &FileSystemContext, + root: &str + ) -> GvfsResult { + todo!() + } +} + +macro_rules! async_call_fun { + ($self:expr, $fun:ident $(, $args:expr)* ) => { + match $self { + StorageFileSystem::MemoryStorage(fs) => fs.$fun($($args),*).await, + } + }; +} + +macro_rules! call_fun { + ($self:expr, $fun:ident $(, $args:expr)* ) => { + match $self { + StorageFileSystem::MemoryStorage(fs) => fs.$fun($($args),*), + } + }; +} + +#[async_trait] +impl PathFileSystem for StorageFileSystem { + async fn init(&self) -> Result<()> { + async_call_fun!(self, init) + } + + async fn stat(&self, path: &Path) -> Result { + async_call_fun!(self, stat, path) + } + + async fn read_dir(&self, path: &Path) -> Result> { + async_call_fun!(self, read_dir, path) + } + + async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + async_call_fun!(self, open_file, path, flags) + } + + async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> Result { + async_call_fun!(self, open_dir, path, flags) + } + + async fn create_file( + &self, + path: &Path, + flags: OpenFileFlags, + ) -> Result { + async_call_fun!(self, create_file, path, flags) + } + + async fn create_dir(&self, path: &Path) -> Result { + async_call_fun!(self, create_dir, path) + } + + async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()> { + async_call_fun!(self, set_attr, path, file_stat, flush) + } + + async fn remove_file(&self, path: &Path) -> Result<()> { + async_call_fun!(self, remove_file, path) + } + + async fn remove_dir(&self, path: &Path) -> Result<()> { + async_call_fun!(self, remove_dir, path) + } + + fn get_capacity(&self) -> Result { + call_fun!(self, get_capacity) + } +} diff --git a/clients/filesystem-fuse/src/utils.rs b/clients/filesystem-fuse/src/utils.rs index 21e52f86af8..bbc8d7d7f8a 100644 --- a/clients/filesystem-fuse/src/utils.rs +++ b/clients/filesystem-fuse/src/utils.rs @@ -16,6 +16,9 @@ * specific language governing permissions and limitations * under the License. */ +use crate::error::GvfsError; + +pub type GvfsResult = Result; #[cfg(test)] mod tests {}