diff --git a/clients/filesystem-fuse/src/gvfs_creator.rs b/clients/filesystem-fuse/src/gvfs_creator.rs new file mode 100644 index 00000000000..1e35bef9c45 --- /dev/null +++ b/clients/filesystem-fuse/src/gvfs_creator.rs @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::config::AppConfig; +use crate::error::ErrorCode::InvalidConfig; +use crate::filesystem::{FileSystemContext, PathFileSystem}; +use crate::gravitino_client::GravitinoClient; +use crate::gvfs_fileset_fs::GvfsFilesetFs; +use crate::gvfs_fuse::{CreateFsResult, FileSystemSchema}; +use crate::open_dal_filesystem::OpenDalFileSystem; +use crate::utils::GvfsResult; +use std::path::Path; + +const FILESET_PREFIX: &str = "gvfs://fileset/"; + +pub async fn create_gvfs_filesystem( + mount_from: &str, + config: &AppConfig, + fs_context: &FileSystemContext, +) -> GvfsResult { + // Gvfs-fuse filesystem structure: + // FuseApiHandle + // ├─ DefaultRawFileSystem (RawFileSystem) + // │ └─ FileSystemLog (PathFileSystem) + // │ ├─ GravitinoComposedFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ S3FileSystem (PathFileSystem) + // │ │ │ └─ OpenDALFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ HDFSFileSystem (PathFileSystem) + // │ │ │ └─ OpenDALFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ JuiceFileSystem (PathFileSystem) + // │ │ │ └─ NasFileSystem (PathFileSystem) + // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) + // │ │ │ └─ XXXFileSystem (PathFileSystem) + // + // `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs. + // It manages file and directory relationships, as well as file mappings. + // It delegates file operations to the PathFileSystem + // + // `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs. + // Similar implementations include permissions, caching, and metrics. + // + // `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`. + // It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage. + // If the user only mounts a fileset, this layer is not present. There will only be one below layer. + // + // `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path. + // and delegate the operation to the real storage. + // + // `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage. + // it can assess the S3, HDFS, gcs, azblob and other storage. + // + // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage. + // + // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage. + // + // `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS. + // + // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage. + // + // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. + + let client = GravitinoClient::new(&config.gravitino); + + let (catalog, schema, fileset) = extract_fileset(mount_from)?; + let location = client + .get_fileset(&catalog, &schema, &fileset) + .await? + .storage_location; + let (schema, location) = extract_storage_filesystem(&location).unwrap(); + + let inner_fs = create_fs_by_schema(&schema, config, fs_context)?; + + let fs = GvfsFilesetFs::new(inner_fs, Path::new(&location), client, config, fs_context).await; + Ok(CreateFsResult::Gvfs(fs)) +} + +fn create_fs_by_schema( + schema: &FileSystemSchema, + config: &AppConfig, + fs_context: &FileSystemContext, +) -> GvfsResult> { + match schema { + FileSystemSchema::S3 => OpenDalFileSystem::create_file_system(schema, config, fs_context), + } +} + +pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> { + if !path.starts_with(FILESET_PREFIX) { + return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); + } + + let path_without_prefix = &path[FILESET_PREFIX.len()..]; + + let parts: Vec<&str> = path_without_prefix.split('/').collect(); + + if parts.len() != 3 { + return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); + } + // todo handle mount catalog or schema + + let catalog = parts[1].to_string(); + let schema = parts[2].to_string(); + let fileset = parts[3].to_string(); + + Ok((catalog, schema, fileset)) +} + +pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema, String)> { + // todo need to improve the logic + if let Some(pos) = path.find("://") { + let protocol = &path[..pos]; + let location = &path[pos + 3..]; + let location = match location.find('/') { + Some(index) => &location[index + 1..], + None => "", + }; + let location = match location.ends_with('/') { + true => location.to_string(), + false => format!("{}/", location), + }; + + match protocol { + "s3" => Some((FileSystemSchema::S3, location.to_string())), + "s3a" => Some((FileSystemSchema::S3, location.to_string())), + _ => None, + } + } else { + None + } +} diff --git a/clients/filesystem-fuse/src/gvfs_fuse.rs b/clients/filesystem-fuse/src/gvfs_fuse.rs index 30c9b740ba0..fe339c6023b 100644 --- a/clients/filesystem-fuse/src/gvfs_fuse.rs +++ b/clients/filesystem-fuse/src/gvfs_fuse.rs @@ -18,23 +18,19 @@ */ use crate::config::AppConfig; use crate::default_raw_filesystem::DefaultRawFileSystem; -use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem}; -use crate::filesystem::{FileSystemContext, PathFileSystem}; +use crate::error::ErrorCode::UnSupportedFilesystem; +use crate::filesystem::FileSystemContext; use crate::fuse_api_handle::FuseApiHandle; use crate::fuse_server::FuseServer; -use crate::gravitino_client::GravitinoClient; +use crate::gvfs_creator::create_gvfs_filesystem; use crate::gvfs_fileset_fs::GvfsFilesetFs; use crate::memory_filesystem::MemoryFileSystem; -use crate::open_dal_filesystem::OpenDalFileSystem; use crate::utils::GvfsResult; use log::info; use once_cell::sync::Lazy; -use std::path::Path; use std::sync::Arc; use tokio::sync::Mutex; -const FILESET_PREFIX: &str = "gvfs://fileset/"; - static SERVER: Lazy>>> = Lazy::new(|| Mutex::new(None)); pub(crate) enum CreateFsResult { @@ -132,122 +128,3 @@ pub async fn create_path_fs( create_gvfs_filesystem(mount_from, config, fs_context).await } } - -pub async fn create_gvfs_filesystem( - mount_from: &str, - config: &AppConfig, - fs_context: &FileSystemContext, -) -> GvfsResult { - // Gvfs-fuse filesystem structure: - // FuseApiHandle - // ├─ DefaultRawFileSystem (RawFileSystem) - // │ └─ FileSystemLog (PathFileSystem) - // │ ├─ GravitinoComposedFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ S3FileSystem (PathFileSystem) - // │ │ │ └─ OpenDALFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ HDFSFileSystem (PathFileSystem) - // │ │ │ └─ OpenDALFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ JuiceFileSystem (PathFileSystem) - // │ │ │ └─ NasFileSystem (PathFileSystem) - // │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem) - // │ │ │ └─ XXXFileSystem (PathFileSystem) - // - // `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs. - // It manages file and directory relationships, as well as file mappings. - // It delegates file operations to the PathFileSystem - // - // `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs. - // Similar implementations include permissions, caching, and metrics. - // - // `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`. - // It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage. - // If the user only mounts a fileset, this layer is not present. There will only be one below layer. - // - // `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path. - // and delegate the operation to the real storage. - // - // `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage. - // it can assess the S3, HDFS, gcs, azblob and other storage. - // - // `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage. - // - // `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage. - // - // `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS. - // - // `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage. - // - // `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions. - - let client = GravitinoClient::new(&config.gravitino); - - let (catalog, schema, fileset) = extract_fileset(mount_from)?; - let location = client - .get_fileset(&catalog, &schema, &fileset) - .await? - .storage_location; - let (schema, location) = extract_storage_filesystem(&location).unwrap(); - - let inner_fs = create_fs_by_schema(&schema, config, fs_context)?; - - let fs = GvfsFilesetFs::new(inner_fs, Path::new(&location), client, config, fs_context).await; - Ok(CreateFsResult::Gvfs(fs)) -} - -fn create_fs_by_schema( - schema: &FileSystemSchema, - config: &AppConfig, - fs_context: &FileSystemContext, -) -> GvfsResult> { - match schema { - FileSystemSchema::S3 => OpenDalFileSystem::create_file_system(schema, config, fs_context), - } -} - -pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> { - if !path.starts_with(FILESET_PREFIX) { - return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); - } - - let path_without_prefix = &path[FILESET_PREFIX.len()..]; - - let parts: Vec<&str> = path_without_prefix.split('/').collect(); - - if parts.len() != 3 { - return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); - } - // todo handle mount catalog or schema - - let catalog = parts[1].to_string(); - let schema = parts[2].to_string(); - let fileset = parts[3].to_string(); - - Ok((catalog, schema, fileset)) -} - -pub fn extract_storage_filesystem(path: &str) -> Option<(FileSystemSchema, String)> { - // todo need to improve the logic - if let Some(pos) = path.find("://") { - let protocol = &path[..pos]; - let location = &path[pos + 3..]; - let location = match location.find('/') { - Some(index) => &location[index + 1..], - None => "", - }; - let location = match location.ends_with('/') { - true => location.to_string(), - false => format!("{}/", location), - }; - - match protocol { - "s3" => Some((FileSystemSchema::S3, location.to_string())), - "s3a" => Some((FileSystemSchema::S3, location.to_string())), - _ => None, - } - } else { - None - } -} diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs index 4f425a6be18..a0c4a742541 100644 --- a/clients/filesystem-fuse/src/lib.rs +++ b/clients/filesystem-fuse/src/lib.rs @@ -26,6 +26,7 @@ mod filesystem; mod fuse_api_handle; mod fuse_server; mod gravitino_client; +mod gvfs_creator; mod gvfs_fileset_fs; mod gvfs_fuse; mod memory_filesystem;