Skip to content

Commit

Permalink
[#5982] feat (gvfs-fuse): Implement Gravitino fileset file system (#5984
Browse files Browse the repository at this point in the history
)

### What changes were proposed in this pull request?

Implement an Gravitino fileset file system, Support mount fileset to
local directory

### Why are the changes needed?

Fix: #5982

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT and IT
  • Loading branch information
diqiu50 authored and jerryshao committed Jan 3, 2025
1 parent e7c8608 commit 2c4dfde
Show file tree
Hide file tree
Showing 19 changed files with 1,281 additions and 172 deletions.
7 changes: 7 additions & 0 deletions clients/filesystem-fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ name = "gvfs_fuse"
[dependencies]
async-trait = "0.1"
bytes = "1.6.0"
config = "0.13"
dashmap = "6.1.0"
fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
futures-util = "0.3.30"
libc = "0.2.168"
log = "0.4.22"
once_cell = "1.20.2"
reqwest = { version = "0.12.9", features = ["json"] }
serde = { version = "1.0.216", features = ["derive"] }
tokio = { version = "1.38.0", features = ["full"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
urlencoding = "2.1.3"

[dev-dependencies]
mockito = "0.31"
38 changes: 38 additions & 0 deletions clients/filesystem-fuse/conf/gvfs_fuse.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# fuse settings
[fuse]
file_mask = 0o600
dir_mask = 0o700
fs_type = "memory"

[fuse.properties]

# filesystem settings
[filesystem]
block_size = 8192

# Gravitino settings
[gravitino]
uri = "http://localhost:8090"
metalake = "your_metalake"

# extent settings
[extend_config]
access_key = "your access_key"
secret_key = "your_secret_key"
330 changes: 330 additions & 0 deletions clients/filesystem-fuse/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::error::ErrorCode::{ConfigNotFound, InvalidConfig};
use crate::utils::GvfsResult;
use config::{builder, Config};
use log::{error, info, warn};
use serde::Deserialize;
use std::collections::HashMap;
use std::fs;

pub(crate) const CONF_FUSE_FILE_MASK: ConfigEntity<u32> = ConfigEntity::new(
FuseConfig::MODULE_NAME,
"file_mask",
"The default file mask for the FUSE filesystem",
0o600,
);

pub(crate) const CONF_FUSE_DIR_MASK: ConfigEntity<u32> = ConfigEntity::new(
FuseConfig::MODULE_NAME,
"dir_mask",
"The default directory mask for the FUSE filesystem",
0o700,
);

pub(crate) const CONF_FUSE_FS_TYPE: ConfigEntity<&'static str> = ConfigEntity::new(
FuseConfig::MODULE_NAME,
"fs_type",
"The type of the FUSE filesystem",
"memory",
);

pub(crate) const CONF_FUSE_CONFIG_PATH: ConfigEntity<&'static str> = ConfigEntity::new(
FuseConfig::MODULE_NAME,
"config_path",
"The path of the FUSE configuration file",
"/etc/gvfs/gvfs.toml",
);

pub(crate) const CONF_FILESYSTEM_BLOCK_SIZE: ConfigEntity<u32> = ConfigEntity::new(
FilesystemConfig::MODULE_NAME,
"block_size",
"The block size of the gvfs fuse filesystem",
4096,
);

pub(crate) const CONF_GRAVITINO_URI: ConfigEntity<&'static str> = ConfigEntity::new(
GravitinoConfig::MODULE_NAME,
"uri",
"The URI of the Gravitino server",
"http://localhost:8090",
);

pub(crate) const CONF_GRAVITINO_METALAKE: ConfigEntity<&'static str> = ConfigEntity::new(
GravitinoConfig::MODULE_NAME,
"metalake",
"The metalake of the Gravitino server",
"",
);

pub(crate) struct ConfigEntity<T: 'static> {
module: &'static str,
name: &'static str,
description: &'static str,
pub(crate) default: T,
}

impl<T> ConfigEntity<T> {
const fn new(
module: &'static str,
name: &'static str,
description: &'static str,
default: T,
) -> Self {
ConfigEntity {
module: module,
name: name,
description: description,
default: default,
}
}
}

enum ConfigValue {
I32(ConfigEntity<i32>),
U32(ConfigEntity<u32>),
String(ConfigEntity<&'static str>),
Bool(ConfigEntity<bool>),
Float(ConfigEntity<f64>),
}

struct DefaultConfig {
configs: HashMap<String, ConfigValue>,
}

impl Default for DefaultConfig {
fn default() -> Self {
let mut configs = HashMap::new();

configs.insert(
Self::compose_key(CONF_FUSE_FILE_MASK),
ConfigValue::U32(CONF_FUSE_FILE_MASK),
);
configs.insert(
Self::compose_key(CONF_FUSE_DIR_MASK),
ConfigValue::U32(CONF_FUSE_DIR_MASK),
);
configs.insert(
Self::compose_key(CONF_FUSE_FS_TYPE),
ConfigValue::String(CONF_FUSE_FS_TYPE),
);
configs.insert(
Self::compose_key(CONF_FUSE_CONFIG_PATH),
ConfigValue::String(CONF_FUSE_CONFIG_PATH),
);
configs.insert(
Self::compose_key(CONF_GRAVITINO_URI),
ConfigValue::String(CONF_GRAVITINO_URI),
);
configs.insert(
Self::compose_key(CONF_GRAVITINO_METALAKE),
ConfigValue::String(CONF_GRAVITINO_METALAKE),
);
configs.insert(
Self::compose_key(CONF_FILESYSTEM_BLOCK_SIZE),
ConfigValue::U32(CONF_FILESYSTEM_BLOCK_SIZE),
);

DefaultConfig { configs }
}
}

impl DefaultConfig {
fn compose_key<T>(entity: ConfigEntity<T>) -> String {
format!("{}.{}", entity.module, entity.name)
}
}

#[derive(Debug, Deserialize)]
pub struct AppConfig {
#[serde(default)]
pub fuse: FuseConfig,
#[serde(default)]
pub filesystem: FilesystemConfig,
#[serde(default)]
pub gravitino: GravitinoConfig,
#[serde(default)]
pub extend_config: HashMap<String, String>,
}

impl Default for AppConfig {
fn default() -> Self {
let builder = Self::crete_default_config_builder();
let conf = builder
.build()
.expect("Failed to build default configuration");
conf.try_deserialize::<AppConfig>()
.expect("Failed to deserialize default AppConfig")
}
}

type ConfigBuilder = builder::ConfigBuilder<builder::DefaultState>;

impl AppConfig {
fn crete_default_config_builder() -> ConfigBuilder {
let default = DefaultConfig::default();

default
.configs
.values()
.fold(
Config::builder(),
|builder, config_entity| match config_entity {
ConfigValue::I32(entity) => Self::add_config(builder, entity),
ConfigValue::U32(entity) => Self::add_config(builder, entity),
ConfigValue::String(entity) => Self::add_config(builder, entity),
ConfigValue::Bool(entity) => Self::add_config(builder, entity),
ConfigValue::Float(entity) => Self::add_config(builder, entity),
},
)
}

fn add_config<T: Clone + Into<config::Value>>(
builder: ConfigBuilder,
entity: &ConfigEntity<T>,
) -> ConfigBuilder {
let name = format!("{}.{}", entity.module, entity.name);
builder
.set_default(&name, entity.default.clone().into())
.unwrap_or_else(|e| panic!("Failed to set default for {}: {}", entity.name, e))
}

pub fn from_file(config_file_path: Option<&str>) -> GvfsResult<AppConfig> {
let builder = Self::crete_default_config_builder();

let config_path = {
if config_file_path.is_some() {
let path = config_file_path.unwrap();
//check config file exists
if fs::metadata(path).is_err() {
return Err(
ConfigNotFound.to_error("The configuration file not found".to_string())
);
}
info!("Use configuration file: {}", path);
path
} else {
//use default config
if fs::metadata(CONF_FUSE_CONFIG_PATH.default).is_err() {
warn!(
"The default configuration file is not found, using the default configuration"
);
return Ok(AppConfig::default());
} else {
warn!(
"Using the default config file {}",
CONF_FUSE_CONFIG_PATH.default
);
}
CONF_FUSE_CONFIG_PATH.default
}
};
let config = builder
.add_source(config::File::with_name(config_path).required(true))
.build();
if let Err(e) = config {
let msg = format!("Failed to build configuration: {}", e);
error!("{}", msg);
return Err(InvalidConfig.to_error(msg));
}

let conf = config.unwrap();
let app_config = conf.try_deserialize::<AppConfig>();

if let Err(e) = app_config {
let msg = format!("Failed to deserialize configuration: {}", e);
error!("{}", msg);
return Err(InvalidConfig.to_error(msg));
}
Ok(app_config.unwrap())
}
}

#[derive(Debug, Deserialize, Default)]
pub struct FuseConfig {
#[serde(default)]
pub file_mask: u32,
#[serde(default)]
pub dir_mask: u32,
#[serde(default)]
pub fs_type: String,
#[serde(default)]
pub config_path: String,
#[serde(default)]
pub properties: HashMap<String, String>,
}

impl FuseConfig {
const MODULE_NAME: &'static str = "fuse";
}

#[derive(Debug, Deserialize, Default)]
pub struct FilesystemConfig {
#[serde(default)]
pub block_size: u32,
}

impl FilesystemConfig {
const MODULE_NAME: &'static str = "filesystem";
}

#[derive(Debug, Deserialize, Default)]
pub struct GravitinoConfig {
#[serde(default)]
pub uri: String,
#[serde(default)]
pub metalake: String,
}

impl GravitinoConfig {
const MODULE_NAME: &'static str = "gravitino";
}

#[cfg(test)]
mod test {
use crate::config::AppConfig;

#[test]
fn test_config_from_file() {
let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_test.toml")).unwrap();
assert_eq!(config.fuse.file_mask, 0o644);
assert_eq!(config.fuse.dir_mask, 0o755);
assert_eq!(config.filesystem.block_size, 8192);
assert_eq!(config.gravitino.uri, "http://localhost:8090");
assert_eq!(config.gravitino.metalake, "test");
assert_eq!(
config.extend_config.get("access_key"),
Some(&"XXX_access_key".to_string())
);
assert_eq!(
config.extend_config.get("secret_key"),
Some(&"XXX_secret_key".to_string())
);
}

#[test]
fn test_default_config() {
let config = AppConfig::default();
assert_eq!(config.fuse.file_mask, 0o600);
assert_eq!(config.fuse.dir_mask, 0o700);
assert_eq!(config.filesystem.block_size, 4096);
assert_eq!(config.gravitino.uri, "http://localhost:8090");
assert_eq!(config.gravitino.metalake, "");
}
}
Loading

0 comments on commit 2c4dfde

Please sign in to comment.