Skip to content

Commit

Permalink
Temp
Browse files Browse the repository at this point in the history
  • Loading branch information
diqiu50 committed Dec 25, 2024
1 parent d4da504 commit 9c4748a
Show file tree
Hide file tree
Showing 17 changed files with 884 additions and 30 deletions.
7 changes: 7 additions & 0 deletions clients/filesystem-fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ log = "0.4.22"
once_cell = "1.20.2"
tokio = { version = "1.38.0", features = ["full"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
serde = { version = "1.0.216", features = ["derive"] }
toml = "0.8.19"
reqwest = { version = "0.12.9", features = ["json"] }
urlencoding = "2.1.3"

[dev-dependencies]
mockito = "0.31"
38 changes: 38 additions & 0 deletions clients/filesystem-fuse/conf/gvfs_test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# fuse settings
[fuse]
default_mask = 0o600

[fuse.properties]
key1 = "value1"
key2 = "value2"

# filesystem settings
[filesystem]
block_size = 8192

# Gravitino settings
[gravitino]
gravitino_url = "http://localhost:8090"
metalake = "test"

# extent settings
[extent_config]
access_key = "XXX_access_key"
secret_key = "XXX_secret_key"
86 changes: 86 additions & 0 deletions clients/filesystem-fuse/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use serde::Deserialize;
use std::collections::HashMap;

#[derive(Debug, Deserialize)]
pub struct Config {
pub fuse: FuseConfig,
pub filesystem: FilesystemConfig,
pub gravitino: GravitinoConfig,
pub extent_config: HashMap<String, String>,
}

impl Config {
pub fn from_file(file: &str) -> Config {
let config_content = std::fs::read_to_string(file).unwrap();
let config = toml::from_str::<Config>(&config_content).unwrap();
config
}

pub fn default() -> Config {
Config {
fuse: FuseConfig {
default_mask: 0o600,
fs_type: "memory".to_string(),
properties: HashMap::new(),
},
filesystem: FilesystemConfig { block_size: 4096 },
gravitino: GravitinoConfig {
gravitino_url: "http://localhost:8090".to_string(),
metalake: "test".to_string(),
},
extent_config: HashMap::new(),
}
}
}

#[derive(Debug, Deserialize)]
pub struct FuseConfig {
pub default_mask: u32,
pub fs_type: String,
pub properties: HashMap<String, String>,
}

#[derive(Debug, Deserialize)]
pub struct FilesystemConfig {
pub block_size: u32,
}

#[derive(Debug, Deserialize)]
pub struct GravitinoConfig {
pub gravitino_url: String,
pub metalake: String,
}

#[cfg(test)]
mod test {
use crate::config::Config;

#[test]
fn test_config_from_file() {
let config = Config::from_file("conf/gvfs_test.toml");
assert_eq!(config.fuse.default_mask, 0o600);
assert_eq!(config.filesystem.block_size, 8192);
assert_eq!(config.gravitino.gravitino_url, "http://localhost:8090");
assert_eq!(config.gravitino.metalake, "test");
assert_eq!(config.extent_config.get("access_key"), Some(&"XXX_access_key".to_string()));
assert_eq!(config.extent_config.get("secret_key"), Some(&"XXX_secret_key".to_string()));
}
}
8 changes: 3 additions & 5 deletions clients/filesystem-fuse/src/default_raw_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
use crate::filesystem::{
FileStat, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, ROOT_DIR_FILE_ID,
ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH,
};
use crate::filesystem::{FileStat, FileSystemContext, PathFileSystem, RawFileSystem, Result, INITIAL_FILE_ID, ROOT_DIR_FILE_ID, ROOT_DIR_PARENT_FILE_ID, ROOT_DIR_PATH};
use crate::opened_file::{FileHandle, OpenFileFlags};
use crate::opened_file_manager::OpenedFileManager;
use async_trait::async_trait;
Expand All @@ -30,6 +27,7 @@ use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
use tokio::sync::RwLock;
use crate::config::Config;

/// DefaultRawFileSystem is a simple implementation for the file system.
/// it is used to manage the file metadata and file handle.
Expand All @@ -47,7 +45,7 @@ pub struct DefaultRawFileSystem<T: PathFileSystem> {
}

impl<T: PathFileSystem> DefaultRawFileSystem<T> {
pub(crate) fn new(fs: T) -> Self {
pub(crate) fn new(fs: T, _config: &Config, _fs_context: &FileSystemContext) -> Self {
Self {
file_entry_manager: RwLock::new(FileEntryManager::new()),
opened_file_manager: OpenedFileManager::new(),
Expand Down
62 changes: 62 additions & 0 deletions clients/filesystem-fuse/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use fuse3::Errno;

#[derive(Debug)]
pub enum ErrorCode {
UnSupportedFilesystem,
GravitinoClientError,
}

impl ErrorCode {
pub fn to_string(&self) -> String {
match self {
ErrorCode::UnSupportedFilesystem => "The filesystem is not supported".to_string(),
_ => "".to_string(),
}
}
pub fn to_error(self, message: impl Into<String>) -> GvfsError {
GvfsError::Error(self, message.into())
}
}

#[derive(Debug)]
pub enum GvfsError {
RestError(String, reqwest::Error),
Error(ErrorCode, String),
Errno(Errno),
IOError(std::io::Error),
}
impl From<reqwest::Error> for GvfsError {
fn from(err: reqwest::Error) -> Self {
GvfsError::RestError("Http request failed:".to_owned() + &err.to_string(), err)
}
}

impl From<Errno> for GvfsError {
fn from(errno: Errno) -> Self {
GvfsError::Errno(errno)
}
}

impl From<std::io::Error> for GvfsError {
fn from(err: std::io::Error) -> Self {
GvfsError::IOError(err)
}
}
6 changes: 6 additions & 0 deletions clients/filesystem-fuse/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub(crate) trait PathFileSystem: Send + Sync {
/// Create the directory by file path , if successful, return the file stat
async fn create_dir(&self, path: &Path) -> Result<FileStat>;


/// Set the file attribute by file path and file stat
async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()>;

Expand All @@ -129,6 +130,8 @@ pub(crate) trait PathFileSystem: Send + Sync {

/// Remove the directory by file path
async fn remove_dir(&self, path: &Path) -> Result<()>;

fn get_capacity(&self) -> Result<FileSystemCapacity>;
}

// FileSystemContext is the system environment for the fuse file system.
Expand Down Expand Up @@ -161,6 +164,9 @@ impl FileSystemContext {
}
}

// capacity of the file system
pub struct FileSystemCapacity {}

// FileStat is the file metadata of the file
#[derive(Clone, Debug)]
pub struct FileStat {
Expand Down
3 changes: 2 additions & 1 deletion clients/filesystem-fuse/src/fuse_api_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use futures_util::StreamExt;
use std::ffi::{OsStr, OsString};
use std::num::NonZeroU32;
use std::time::{Duration, SystemTime};
use crate::config::Config;

pub(crate) struct FuseApiHandle<T: RawFileSystem> {
fs: T,
Expand All @@ -44,7 +45,7 @@ impl<T: RawFileSystem> FuseApiHandle<T> {
const DEFAULT_ATTR_TTL: Duration = Duration::from_secs(1);
const DEFAULT_MAX_WRITE_SIZE: u32 = 16 * 1024;

pub fn new(fs: T, context: FileSystemContext) -> Self {
pub fn new(fs: T, _config: &Config, context: FileSystemContext) -> Self {
Self {
fs: fs,
default_ttl: Self::DEFAULT_ATTR_TTL,
Expand Down
8 changes: 5 additions & 3 deletions clients/filesystem-fuse/src/fuse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
* under the License.
*/
use fuse3::raw::{Filesystem, Session};
use fuse3::{MountOptions, Result};
use fuse3::{MountOptions};
use log::{error, info};
use std::process::exit;
use std::sync::Arc;
use tokio::select;
use tokio::sync::Notify;
use crate::utils::GvfsResult;

/// Represents a FUSE server capable of starting and stopping the FUSE filesystem.
pub struct FuseServer {
Expand All @@ -43,7 +44,7 @@ impl FuseServer {
}

/// Starts the FUSE filesystem and blocks until it is stopped.
pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> Result<()> {
pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> GvfsResult<()> {
//check if the mount point exists
if !std::path::Path::new(&self.mount_point).exists() {
error!("Mount point {} does not exist", self.mount_point);
Expand Down Expand Up @@ -83,11 +84,12 @@ impl FuseServer {
}

/// Stops the FUSE filesystem.
pub async fn stop(&self) {
pub async fn stop(&self) -> GvfsResult<()>{
info!("Stopping FUSE filesystem...");
self.close_notify.notify_one();

// wait for the filesystem to stop
self.close_notify.notified().await;
Ok(())
}
}
Loading

0 comments on commit 9c4748a

Please sign in to comment.