diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock index 6a3952492a5..8ca5394fa01 100644 --- a/bin/ofs/Cargo.lock +++ b/bin/ofs/Cargo.lock @@ -57,9 +57,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] name = "anstyle-parse" @@ -216,9 +216,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.33" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", @@ -419,9 +419,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05e7cf40684ae96ade6232ed84582f40ce0a66efcd43a5117aef610534f8e0b8" +checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" dependencies = [ "anstream", "anstyle", @@ -1026,7 +1026,8 @@ name = "ofs" version = "0.0.1+core.0.45.0" dependencies = [ "anyhow", - "async-trait", + "bytes", + "chrono", "clap", "env_logger", "fuse3", @@ -1035,6 +1036,7 @@ dependencies = [ "log", "nix 0.27.1", "opendal", + "sharded-slab", "tokio", "url", ] @@ -1559,6 +1561,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signature" version = "2.2.0" diff --git a/bin/ofs/Cargo.toml b/bin/ofs/Cargo.toml index 187e81d4f8a..f6a64bc5e0b 100644 --- a/bin/ofs/Cargo.toml +++ b/bin/ofs/Cargo.toml @@ -31,14 +31,10 @@ rust-version = "1.67" [dependencies] anyhow = "1" -async-trait = "0.1.75" clap = { version = "4.5.1", features = ["derive", "env"] } -env_logger = "0.11" -fuse3 = { "version" = "0.6.1", "features" = ["tokio-runtime", "unprivileged"] } +env_logger = "0.11.2" futures-util = "0.3.30" -libc = "0.2.151" log = "0.4.20" -nix = { version = "0.27.1", features = ["user"] } opendal = { version = "0.45.0", path = "../../core" } tokio = { version = "1.34", features = [ "fs", @@ -47,3 +43,11 @@ tokio = { version = "1.34", features = [ "io-std", ] } url = "2.5.0" +chrono = "0.4.34" +sharded-slab = "0.1.7" +bytes = "1.5.0" + +[target.'cfg(target_os = "linux")'.dependencies] +libc = "0.2.151" +fuse3 = { "version" = "0.6.1", "features" = ["tokio-runtime", "unprivileged"] } +nix = { version = "0.27.1", features = ["user"] } diff --git a/bin/ofs/src/bin/ofs.rs b/bin/ofs/src/bin/ofs.rs index 08399b56ef2..89a61992872 100644 --- a/bin/ofs/src/bin/ofs.rs +++ b/bin/ofs/src/bin/ofs.rs @@ -15,69 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; -use std::str::FromStr; - -use anyhow::anyhow; -use anyhow::Context; use anyhow::Result; use clap::Parser; -use fuse3::path::Session; -use fuse3::MountOptions; -use ofs::Ofs; -use opendal::Operator; -use opendal::Scheme; -use url::Url; #[tokio::main] async fn main() -> Result<()> { - env_logger::init(); - fuse().await -} - -#[derive(Parser, Debug)] -#[command(version, about)] -struct Config { - /// fuse mount path - #[arg(env = "OFS_MOUNT_PATH", index = 1)] - mount_path: String, - - /// location of opendal service - /// format: ://?=&= - /// example: fs://root=/tmp - #[arg(env = "OFS_BACKEND", index = 2)] - backend: String, -} - -async fn fuse() -> Result<()> { - let cfg = Config::try_parse().context("parse command line arguments")?; - - let location = Url::parse(&cfg.backend)?; - if location.has_host() { - Err(anyhow!("Host part in a location is not supported."))?; - } + let cfg = ofs::Config::parse(); - let scheme_str = location.scheme(); - - let op_args = location - .query_pairs() - .into_owned() - .collect::>(); - - let scheme = Scheme::from_str(scheme_str).context("unsupported scheme")?; - let op = Operator::via_map(scheme, op_args)?; - - let mut mount_option = MountOptions::default(); - mount_option.uid(nix::unistd::getuid().into()); - mount_option.gid(nix::unistd::getgid().into()); - - let ofs = Ofs { op }; - - let mounthandle = Session::new(mount_option) - .mount_with_unprivileged(ofs, cfg.mount_path) - .await?; - - mounthandle.await?; - - Ok(()) + env_logger::init(); + ofs::new_app(cfg).await } diff --git a/bin/ofs/src/config.rs b/bin/ofs/src/config.rs new file mode 100644 index 00000000000..6afa729a79e --- /dev/null +++ b/bin/ofs/src/config.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use clap::Parser; +use url::Url; + +#[derive(Parser, Debug)] +#[command(version, about)] +pub struct Config { + /// fuse mount path + #[arg(env = "OFS_MOUNT_PATH", index = 1)] + pub(crate) mount_path: String, + + /// location of opendal service + /// format: ://?=&= + /// example: fs://?root=/tmp + #[arg(env = "OFS_BACKEND", index = 2)] + pub(crate) backend: Url, +} diff --git a/bin/ofs/src/fuse.rs b/bin/ofs/src/fuse.rs new file mode 100644 index 00000000000..214187e861e --- /dev/null +++ b/bin/ofs/src/fuse.rs @@ -0,0 +1,670 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ffi::OsStr; +use std::ffi::OsString; +use std::ops::Deref; +use std::path::PathBuf; +use std::time::Duration; +use std::time::SystemTime; + +use bytes::Bytes; +use fuse3::async_trait; +use fuse3::path::prelude::*; +use fuse3::Errno; +use fuse3::Result; +use futures_util::stream; +use futures_util::stream::BoxStream; +use futures_util::StreamExt; + +use opendal::Entry; +use opendal::EntryMode; +use opendal::ErrorKind; +use opendal::Metadata; +use opendal::Operator; +use sharded_slab::Slab; + +const TTL: Duration = Duration::from_secs(1); // 1 second + +#[derive(Debug, Clone)] +struct OpenedFile { + path: OsString, + is_read: bool, + is_write: bool, + is_append: bool, +} + +pub(super) struct Ofs { + op: Operator, + gid: u32, + uid: u32, + opened_files: Slab, +} + +impl Ofs { + pub fn new(op: Operator, uid: u32, gid: u32) -> Self { + Self { + op, + uid, + gid, + opened_files: Slab::new(), + } + } + + fn check_flags(&self, flags: u32) -> Result<(bool, bool)> { + let mode = flags & libc::O_ACCMODE as u32; + let is_read = mode == libc::O_RDONLY as u32 || mode == libc::O_RDWR as u32; + let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR as u32; + if !is_read && !is_write { + Err(Errno::from(libc::EINVAL))?; + } + + let capability = self.op.info().full_capability(); + if is_read && !capability.read { + Err(Errno::from(libc::EACCES))?; + } + if is_write && !capability.write { + Err(Errno::from(libc::EACCES))?; + } + + log::trace!("check_flags: is_read={}, is_write={}", is_read, is_write); + Ok((is_read, is_write)) + } + + // Get opened file and check given path + fn get_opened_file(&self, key: usize, path: Option<&OsStr>) -> Result { + let file = self + .opened_files + .get(key) + .as_ref() + .ok_or(Errno::from(libc::ENOENT))? + .deref() + .clone(); + if matches!(path, Some(path) if path != file.path) { + log::trace!( + "get_opened_file: path not match: path={:?}, file={:?}", + path, + file.path + ); + Err(Errno::from(libc::EBADF))?; + } + + Ok(file) + } +} + +#[async_trait] +impl PathFilesystem for Ofs { + type DirEntryStream = BoxStream<'static, Result>; + type DirEntryPlusStream = BoxStream<'static, Result>; + + // Init a fuse filesystem + async fn init(&self, _req: Request) -> Result<()> { + Ok(()) + } + + // Callback when fs is being destroyed + async fn destroy(&self, _req: Request) {} + + async fn lookup(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result { + log::debug!("lookup(parent={:?}, name={:?})", parent, name); + + let path = PathBuf::from(parent).join(name); + let metadata = self + .op + .stat(&path.to_string_lossy()) + .await + .map_err(opendal_error2errno)?; + + let now = SystemTime::now(); + let attr = metadata2file_attr(&metadata, now, self.uid, self.gid); + + Ok(ReplyEntry { ttl: TTL, attr }) + } + + async fn getattr( + &self, + _req: Request, + path: Option<&OsStr>, + fh: Option, + flags: u32, + ) -> Result { + log::debug!("getattr(path={:?}, fh={:?}, flags={:?})", path, fh, flags); + + let key = fh.unwrap_or_default() - 1; + let fh_path = self + .opened_files + .get(key as usize) + .as_ref() + .map(|f| &f.path) + .cloned(); + + let file_path = match (path.map(Into::into), fh_path) { + (Some(a), Some(b)) => { + if a != b { + Err(Errno::from(libc::EBADF))?; + } + Some(a) + } + (a, b) => a.or(b), + }; + + let metadata = self + .op + .stat(&file_path.unwrap_or_default().to_string_lossy()) + .await + .map_err(opendal_error2errno)?; + + let now = SystemTime::now(); + let attr = metadata2file_attr(&metadata, now, self.uid, self.gid); + + Ok(ReplyAttr { ttl: TTL, attr }) + } + + async fn setattr( + &self, + _req: Request, + path: Option<&OsStr>, + fh: Option, + set_attr: SetAttr, + ) -> Result { + log::debug!( + "setattr(path={:?}, fh={:?}, set_attr={:?})", + path, + fh, + set_attr + ); + Err(libc::EOPNOTSUPP.into()) + } + + async fn symlink( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + link_path: &OsStr, + ) -> Result { + log::debug!( + "symlink(parent={:?}, name={:?}, link_path={:?})", + parent, + name, + link_path + ); + Err(libc::EOPNOTSUPP.into()) + } + + async fn mknod( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + _rdev: u32, + ) -> Result { + log::debug!( + "mknod(parent={:?}, name={:?}, mode=0o{:o})", + parent, + name, + mode + ); + Err(libc::EOPNOTSUPP.into()) + } + + async fn mkdir( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + _umask: u32, + ) -> Result { + log::debug!( + "mkdir(parent={:?}, name={:?}, mode=0o{:o})", + parent, + name, + mode + ); + + let mut path = PathBuf::from(parent).join(name); + path.push(""); // ref https://users.rust-lang.org/t/trailing-in-paths/43166 + self.op + .create_dir(&path.to_string_lossy()) + .await + .map_err(opendal_error2errno)?; + + let metadata = Metadata::new(EntryMode::DIR); + let now = SystemTime::now(); + let attr = metadata2file_attr(&metadata, now, self.uid, self.gid); + + Ok(ReplyEntry { ttl: TTL, attr }) + } + + async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { + log::debug!("unlink(parent={:?}, name={:?})", parent, name); + + let path = PathBuf::from(parent).join(name); + self.op + .delete(&path.to_string_lossy()) + .await + .map_err(opendal_error2errno)?; + + Ok(()) + } + + async fn rmdir(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { + log::debug!("rmdir(parent={:?}, name={:?})", parent, name); + + let path = PathBuf::from(parent).join(name); + self.op + .delete(&path.to_string_lossy()) + .await + .map_err(opendal_error2errno)?; + + Ok(()) + } + + async fn rename( + &self, + _req: Request, + origin_parent: &OsStr, + origin_name: &OsStr, + parent: &OsStr, + name: &OsStr, + ) -> Result<()> { + log::debug!( + "rename(p={:?}, name={:?}, newp={:?}, newname={:?})", + origin_parent, + origin_name, + parent, + name + ); + + let origin_path = PathBuf::from(origin_parent).join(origin_name); + let path = PathBuf::from(parent).join(name); + + self.op + .rename(&origin_path.to_string_lossy(), &path.to_string_lossy()) + .await + .map_err(opendal_error2errno)?; + + Ok(()) + } + + async fn link( + &self, + _req: Request, + path: &OsStr, + new_parent: &OsStr, + new_name: &OsStr, + ) -> Result { + log::debug!( + "link(path={:?}, new_parent={:?}, new_name={:?})", + path, + new_parent, + new_name + ); + Err(libc::EOPNOTSUPP.into()) + } + + async fn create( + &self, + _req: Request, + parent: &OsStr, + name: &OsStr, + mode: u32, + flags: u32, + ) -> Result { + log::debug!( + "create(parent={:?}, name={:?}, mode=0o{:o}, flags=0x{:x})", + parent, + name, + mode, + flags + ); + + let (is_read, is_write) = self.check_flags(flags)?; + + let path = PathBuf::from(parent).join(name); + self.op + .write(&path.to_string_lossy(), Bytes::new()) + .await + .map_err(opendal_error2errno)?; + + let metadata = Metadata::new(EntryMode::FILE); + let attr = metadata2file_attr(&metadata, SystemTime::now(), self.uid, self.gid); + + let fh = self + .opened_files + .insert(OpenedFile { + path: path.into(), + is_read, + is_write, + is_append: flags & libc::O_APPEND as u32 != 0, + }) + .ok_or(Errno::from(libc::EBUSY))? as u64 + + 1; // ensure fh > 0 + + Ok(ReplyCreated { + ttl: TTL, + attr, + generation: 0, + fh, + flags, + }) + } + + async fn release( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + flags: u32, + lock_owner: u64, + flush: bool, + ) -> Result<()> { + log::debug!( + "release(path={:?}, fh={}, flags=0x{:x}, lock_owner={}, flush={})", + path, + fh, + flags, + lock_owner, + flush + ); + + let key = fh as usize - 1; + let file = self + .opened_files + .take(key) + .ok_or(Errno::from(libc::EBADF))?; + if matches!(path, Some(ref p) if p != &file.path) { + Err(Errno::from(libc::EBADF))?; + } + + Ok(()) + } + + async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { + log::debug!("open(path={:?}, flags=0x{:x})", path, flags); + + let (is_read, is_write) = self.check_flags(flags)?; + + let fh = self + .opened_files + .insert(OpenedFile { + path: path.into(), + is_read, + is_write, + is_append: flags & libc::O_APPEND as u32 != 0, + }) + .ok_or(Errno::from(libc::EBUSY))? as u64 + + 1; // ensure fh > 0 + + Ok(ReplyOpen { fh, flags }) + } + + async fn read( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + offset: u64, + size: u32, + ) -> Result { + log::debug!( + "read(path={:?}, fh={}, offset={}, size={})", + path, + fh, + offset, + size + ); + + if fh == 0 { + Err(Errno::from(libc::EBADF))?; + } + let key = fh - 1; + let file = self.get_opened_file(key as _, path)?; + + if !file.is_read { + Err(Errno::from(libc::EACCES))?; + } + + let data = self + .op + .read_with(&file.path.to_string_lossy()) + .range(offset..offset + size as u64) + .await + .map_err(opendal_error2errno)?; + + Ok(ReplyData { data: data.into() }) + } + + async fn write( + &self, + _req: Request, + path: Option<&OsStr>, + fh: u64, + offset: u64, + data: &[u8], + flags: u32, + ) -> Result { + log::debug!( + "write(path={:?}, fh={}, offset={}, data_len={}, flags=0x{:x})", + path, + fh, + offset, + data.len(), + flags + ); + + if offset != 0 { + Err(Errno::from(libc::EINVAL))?; + } + + if fh == 0 { + Err(Errno::from(libc::EBADF))?; + } + let key = fh - 1; + + let file = self.get_opened_file(key as _, path)?; + if !file.is_write { + Err(Errno::from(libc::EACCES))?; + } + + self.op + .write_with( + &file.path.clone().to_string_lossy(), + Bytes::copy_from_slice(data), + ) + .append(file.is_append) + .await + .map_err(opendal_error2errno)?; + + Ok(ReplyWrite { + written: data.len() as _, + }) + } + + async fn readdir( + &self, + _req: Request, + path: &OsStr, + fh: u64, + offset: i64, + ) -> Result> { + log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset); + + let mut current_dir = PathBuf::from(path); + current_dir.push(""); // ref https://users.rust-lang.org/t/trailing-in-paths/43166 + let children = self + .op + .lister(¤t_dir.to_string_lossy()) + .await + .map_err(opendal_error2errno)? + .enumerate() + .map(|(i, entry)| { + entry + .map(|e| DirectoryEntry { + kind: entry_mode2file_type(e.metadata().mode()), + name: e.name().trim_matches('/').into(), + offset: (i + 3) as i64, + }) + .map_err(opendal_error2errno) + }); + + let relative_paths = stream::iter([ + Result::Ok(DirectoryEntry { + kind: FileType::Directory, + name: ".".into(), + offset: 1, + }), + Result::Ok(DirectoryEntry { + kind: FileType::Directory, + name: "..".into(), + offset: 2, + }), + ]); + + Ok(ReplyDirectory { + entries: relative_paths.chain(children).skip(offset as usize).boxed(), + }) + } + + async fn access(&self, _req: Request, path: &OsStr, mask: u32) -> Result<()> { + log::debug!("access(path={:?}, mask=0x{:x})", path, mask); + + self.check_flags(mask)?; + self.op + .stat(&path.to_string_lossy()) + .await + .map_err(opendal_error2errno)?; + + Ok(()) + } + + async fn readdirplus( + &self, + _req: Request, + parent: &OsStr, + fh: u64, + offset: u64, + _lock_owner: u64, + ) -> Result> { + log::debug!( + "readdirplus(parent={:?}, fh={}, offset={})", + parent, + fh, + offset + ); + + let make_entry = |op: Operator, i: usize, entry: opendal::Result, uid, gid, now| async move { + let e = entry.map_err(opendal_error2errno)?; + let metadata = op + .stat(e.name()) + .await + .unwrap_or_else(|_| e.metadata().clone()); + let attr = metadata2file_attr(&metadata, now, uid, gid); + Result::Ok(DirectoryEntryPlus { + kind: entry_mode2file_type(metadata.mode()), + name: e.name().trim_matches('/').into(), + offset: (i + 3) as i64, + attr, + entry_ttl: TTL, + attr_ttl: TTL, + }) + }; + + let now = SystemTime::now(); + let mut current_dir = PathBuf::from(parent); + current_dir.push(""); // ref https://users.rust-lang.org/t/trailing-in-paths/43166 + let op = self.op.clone(); + let uid = self.uid; + let gid = self.gid; + let children = self + .op + .lister(¤t_dir.to_string_lossy()) + .await + .map_err(opendal_error2errno)? + .enumerate() + .then(move |(i, entry)| make_entry(op.clone(), i, entry, uid, gid, now)); + + let relative_path_metadata = Metadata::new(EntryMode::DIR); + let relative_path_attr = metadata2file_attr(&relative_path_metadata, now, uid, gid); + let relative_paths = stream::iter([ + Result::Ok(DirectoryEntryPlus { + kind: FileType::Directory, + name: ".".into(), + offset: 1, + attr: relative_path_attr, + entry_ttl: TTL, + attr_ttl: TTL, + }), + Result::Ok(DirectoryEntryPlus { + kind: FileType::Directory, + name: "..".into(), + offset: 2, + attr: relative_path_attr, + entry_ttl: TTL, + attr_ttl: TTL, + }), + ]); + + Ok(ReplyDirectoryPlus { + entries: relative_paths.chain(children).skip(offset as usize).boxed(), + }) + } +} + +const fn entry_mode2file_type(mode: EntryMode) -> FileType { + match mode { + EntryMode::DIR => FileType::Directory, + _ => FileType::RegularFile, + } +} + +fn metadata2file_attr(metadata: &Metadata, atime: SystemTime, uid: u32, gid: u32) -> FileAttr { + let last_modified = metadata.last_modified().map(|t| t.into()).unwrap_or(atime); + let kind = entry_mode2file_type(metadata.mode()); + FileAttr { + size: metadata.content_length(), + blocks: 0, + atime, + mtime: last_modified, + ctime: last_modified, + kind, + perm: fuse3::perm_from_mode_and_kind(kind, 0o775), + nlink: 0, + uid, + gid, + rdev: 0, + blksize: 4096, + } +} + +fn opendal_error2errno(err: opendal::Error) -> fuse3::Errno { + log::trace!("opendal_error2errno: {:?}", err); + match err.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::IsADirectory => Errno::from(libc::EISDIR), + ErrorKind::NotFound => Errno::from(libc::ENOENT), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + ErrorKind::AlreadyExists => Errno::from(libc::EEXIST), + ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR), + ErrorKind::ContentTruncated => Errno::from(libc::EAGAIN), + ErrorKind::ContentIncomplete => Errno::from(libc::EIO), + _ => Errno::from(libc::ENOENT), + } +} diff --git a/bin/ofs/src/lib.rs b/bin/ofs/src/lib.rs index 3fc0cadae6b..c82d5e5edc8 100644 --- a/bin/ofs/src/lib.rs +++ b/bin/ofs/src/lib.rs @@ -15,209 +15,75 @@ // specific language governing permissions and limitations // under the License. -use std::ffi::OsStr; -use std::vec::IntoIter; - -use async_trait::async_trait; -use fuse3::path::prelude::*; -use fuse3::Result; -use futures_util::stream::Empty; -use futures_util::stream::Iter; -use opendal::Operator; - -pub struct Ofs { - pub op: Operator, -} - -#[async_trait] -impl PathFilesystem for Ofs { - type DirEntryStream = Empty>; - type DirEntryPlusStream = Iter>>; - - // Init a fuse filesystem - async fn init(&self, _req: Request) -> Result<()> { - Ok(()) - } +use std::collections::HashMap; +use std::str::FromStr; - // Callback when fs is being destroyed - async fn destroy(&self, _req: Request) {} +use anyhow::anyhow; +use anyhow::Result; +use opendal::Operator; +use opendal::Scheme; - async fn lookup(&self, _req: Request, _parent: &OsStr, _name: &OsStr) -> Result { - // TODO - Err(libc::ENOSYS.into()) - } +pub mod config; +pub use config::Config; - async fn getattr( - &self, - _req: Request, - path: Option<&OsStr>, - _fh: Option, - _flags: u32, - ) -> Result { - // TODO - log::debug!("getattr(path={:?})", path); - - Err(libc::ENOSYS.into()) - } - - async fn read( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - offset: u64, - size: u32, - ) -> Result { - // TODO - log::debug!( - "read(path={:?}, fh={}, offset={}, size={})", - path, - fh, - offset, - size - ); - - Err(libc::ENOSYS.into()) - } +#[cfg(target_os = "linux")] +mod fuse; - async fn mkdir( - &self, - _req: Request, - parent: &OsStr, - name: &OsStr, - mode: u32, - _umask: u32, - ) -> Result { - // TODO - log::debug!( - "mkdir(parent={:?}, name={:?}, mode=0o{:o})", - parent, - name, - mode - ); - - Err(libc::ENOSYS.into()) +pub async fn new_app(cfg: Config) -> Result<()> { + if cfg.backend.has_host() { + log::warn!("backend host will be ignored"); } - async fn readdir( - &self, - _req: Request, - path: &OsStr, - fh: u64, - offset: i64, - ) -> Result> { - // TODO - log::debug!("readdir(path={:?}, fh={}, offset={})", path, fh, offset); - - Err(libc::ENOSYS.into()) - } + let scheme_str = cfg.backend.scheme(); + let op_args = cfg + .backend + .query_pairs() + .into_owned() + .collect::>(); + + let scheme = match Scheme::from_str(scheme_str) { + Ok(Scheme::Custom(_)) | Err(_) => Err(anyhow!("invalid scheme: {}", scheme_str)), + Ok(s) => Ok(s), + }?; + let backend = Operator::via_map(scheme, op_args)?; + + let args = Args { + mount_path: cfg.mount_path, + backend, + }; + execute(args).await +} - async fn mknod( - &self, - _req: Request, - parent: &OsStr, - name: &OsStr, - mode: u32, - _rdev: u32, - ) -> Result { - // TODO - log::debug!( - "mknod(parent={:?}, name={:?}, mode=0o{:o})", - parent, - name, - mode - ); - - Err(libc::ENOSYS.into()) - } +struct Args { + mount_path: String, + backend: Operator, +} - async fn open(&self, _req: Request, path: &OsStr, flags: u32) -> Result { - // TODO - log::debug!("open(path={:?}, flags=0x{:x})", path, flags); +#[cfg(not(target_os = "linux"))] +async fn execute(_: FrontendArgs) -> Result<()> { + Err(anyhow::anyhow!("platform not supported")) +} - Err(libc::ENOSYS.into()) - } +#[cfg(target_os = "linux")] +async fn execute(args: Args) -> Result<()> { + use fuse3::path::Session; + use fuse3::MountOptions; - async fn setattr( - &self, - _req: Request, - path: Option<&OsStr>, - _fh: Option, - _set_attr: SetAttr, - ) -> Result { - // TODO - log::debug!("setattr(path={:?})", path); - - Err(libc::ENOSYS.into()) - } + let uid = nix::unistd::getuid(); + let gid = nix::unistd::getgid(); - async fn write( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - offset: u64, - data: &[u8], - flags: u32, - ) -> Result { - // TODO - log::debug!( - "write(path={:?}, fh={}, offset={}, len={}, flags=0x{:x})", - path, - fh, - offset, - data.len(), - flags - ); - - Err(libc::ENOSYS.into()) - } + let mut mount_option = MountOptions::default(); + mount_option.uid(uid.into()); + mount_option.gid(gid.into()); + mount_option.no_open_dir_support(true); - async fn release( - &self, - _req: Request, - path: Option<&OsStr>, - fh: u64, - flags: u32, - _lock_owner: u64, - flush: bool, - ) -> Result<()> { - // TODO - log::debug!( - "release(path={:?}, fh={}, flags={}, flush={})", - path, - fh, - flags, - flush - ); - - Err(libc::ENOSYS.into()) - } + let ofs = fuse::Ofs::new(args.backend, uid.into(), gid.into()); - async fn rename( - &self, - _req: Request, - origin_parent: &OsStr, - origin_name: &OsStr, - parent: &OsStr, - name: &OsStr, - ) -> Result<()> { - // TODO - log::debug!( - "rename(p={:?}, name={:?}, newp={:?}, newname={:?})", - origin_parent, - origin_name, - parent, - name - ); - - Err(libc::ENOSYS.into()) - } + let mount_handle = Session::new(mount_option) + .mount_with_unprivileged(ofs, args.mount_path) + .await?; - async fn unlink(&self, _req: Request, parent: &OsStr, name: &OsStr) -> Result<()> { - // TODO - log::debug!("unlink(parent={:?}, name={:?})", parent, name); + mount_handle.await?; - Err(libc::ENOSYS.into()) - } + Ok(()) }