From f78adee512429bcc19589bd79537d78d255a56d9 Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 26 Dec 2024 20:50:24 +0800 Subject: [PATCH] Support s3 --- clients/filesystem-fuse/Cargo.toml | 1 + clients/filesystem-fuse/src/gvfs_fuse.rs | 31 +- clients/filesystem-fuse/src/lib.rs | 1 + .../src/open_dal_filesystem.rs | 265 ++++++++++++++++++ clients/filesystem-fuse/src/opened_file.rs | 26 ++ 5 files changed, 310 insertions(+), 14 deletions(-) create mode 100644 clients/filesystem-fuse/src/open_dal_filesystem.rs diff --git a/clients/filesystem-fuse/Cargo.toml b/clients/filesystem-fuse/Cargo.toml index 4008ec5ca2f..3760bd5285f 100644 --- a/clients/filesystem-fuse/Cargo.toml +++ b/clients/filesystem-fuse/Cargo.toml @@ -42,6 +42,7 @@ futures-util = "0.3.30" libc = "0.2.168" log = "0.4.22" once_cell = "1.20.2" +opendal = { version = "0.46.0", features = ["services-s3"] } reqwest = { version = "0.12.9", features = ["json"] } serde = { version = "1.0.216", features = ["derive"] } tokio = { version = "1.38.0", features = ["full"] } diff --git a/clients/filesystem-fuse/src/gvfs_fuse.rs b/clients/filesystem-fuse/src/gvfs_fuse.rs index 31340ae3251..30c9b740ba0 100644 --- a/clients/filesystem-fuse/src/gvfs_fuse.rs +++ b/clients/filesystem-fuse/src/gvfs_fuse.rs @@ -19,12 +19,13 @@ use crate::config::AppConfig; use crate::default_raw_filesystem::DefaultRawFileSystem; use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem}; -use crate::filesystem::FileSystemContext; +use crate::filesystem::{FileSystemContext, PathFileSystem}; use crate::fuse_api_handle::FuseApiHandle; use crate::fuse_server::FuseServer; use crate::gravitino_client::GravitinoClient; use crate::gvfs_fileset_fs::GvfsFilesetFs; use crate::memory_filesystem::MemoryFileSystem; +use crate::open_dal_filesystem::OpenDalFileSystem; use crate::utils::GvfsResult; use log::info; use once_cell::sync::Lazy; @@ -188,22 +189,24 @@ pub async fn create_gvfs_filesystem( .get_fileset(&catalog, &schema, &fileset) .await? .storage_location; - let (_schema, location) = extract_storage_filesystem(&location).unwrap(); - - // todo need to replace the inner filesystem with the real storage filesystem - let inner_fs = MemoryFileSystem::new().await; - - let fs = GvfsFilesetFs::new( - Box::new(inner_fs), - Path::new(&location), - client, - config, - fs_context, - ) - .await; + let (schema, location) = extract_storage_filesystem(&location).unwrap(); + + let inner_fs = create_fs_by_schema(&schema, config, fs_context)?; + + let fs = GvfsFilesetFs::new(inner_fs, Path::new(&location), client, config, fs_context).await; Ok(CreateFsResult::Gvfs(fs)) } +fn create_fs_by_schema( + schema: &FileSystemSchema, + config: &AppConfig, + fs_context: &FileSystemContext, +) -> GvfsResult> { + match schema { + FileSystemSchema::S3 => OpenDalFileSystem::create_file_system(schema, config, fs_context), + } +} + pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> { if !path.starts_with(FILESET_PREFIX) { return Err(InvalidConfig.to_error("Invalid fileset path".to_string())); diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs index f554c7f385e..4f425a6be18 100644 --- a/clients/filesystem-fuse/src/lib.rs +++ b/clients/filesystem-fuse/src/lib.rs @@ -29,6 +29,7 @@ mod gravitino_client; mod gvfs_fileset_fs; mod gvfs_fuse; mod memory_filesystem; +mod open_dal_filesystem; mod opened_file; mod opened_file_manager; mod utils; diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs b/clients/filesystem-fuse/src/open_dal_filesystem.rs new file mode 100644 index 00000000000..e0a455102fd --- /dev/null +++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::AppConfig; +use crate::filesystem::{ + FileReader, FileStat, FileSystemCapacity, FileSystemContext, FileWriter, PathFileSystem, Result, +}; +use crate::gvfs_fuse::FileSystemSchema; +use crate::opened_file::{OpenFileFlags, OpenedFile}; +use crate::utils::GvfsResult; +use async_trait::async_trait; +use bytes::Bytes; +use fuse3::FileType::{Directory, RegularFile}; +use fuse3::{Errno, FileType, Timestamp}; +use log::debug; +use opendal::layers::LoggingLayer; +use opendal::services::S3; +use opendal::{Builder, EntryMode, ErrorKind, Metadata, Operator}; +use std::path::Path; +use std::time::SystemTime; + +pub(crate) struct OpenDalFileSystem { + op: Operator, +} + +impl OpenDalFileSystem {} + +impl OpenDalFileSystem { + fn new(op: Operator, _config: &AppConfig, _fs_context: &FileSystemContext) -> Self { + Self { op: op } + } + + pub(crate) fn create_file_system( + schema: &FileSystemSchema, + config: &AppConfig, + fs_context: &FileSystemContext, + ) -> GvfsResult> { + match schema { + FileSystemSchema::S3 => { + let builder = S3::from_map(config.extent_config.clone()); + let op = Operator::new(builder) + .expect("opendal create failed") + .layer(LoggingLayer::default()) + .finish(); + Ok(Box::new(OpenDalFileSystem::new(op, config, fs_context))) + } + } + } + + fn opendal_meta_to_file_stat(&self, meta: &Metadata, file_stat: &mut FileStat) { + let now = SystemTime::now(); + let mtime = meta.last_modified().map(|x| x.into()).unwrap_or(now); + + file_stat.size = meta.content_length(); + file_stat.kind = opendal_filemode_to_filetype(meta.mode()); + file_stat.ctime = Timestamp::from(mtime); + file_stat.atime = Timestamp::from(now); + file_stat.mtime = Timestamp::from(mtime); + } +} + +#[async_trait] +impl PathFileSystem for OpenDalFileSystem { + async fn init(&self) -> Result<()> { + Ok(()) + } + + async fn stat(&self, path: &Path) -> Result { + let file_name = path.to_string_lossy().to_string(); + let meta = self + .op + .stat_with(&file_name) + .await + .map_err(opendal_error_to_errno)?; + + let mut file_stat = FileStat::new_file_filestat_with_path(path, 0); + self.opendal_meta_to_file_stat(&meta, &mut file_stat); + Ok(file_stat) + } + + async fn read_dir(&self, path: &Path) -> Result> { + let file_name = path.to_string_lossy().to_string(); + let entries = self + .op + .list(&file_name) + .await + .map_err(opendal_error_to_errno)?; + entries + .iter() + .map(|entry| { + let path = Path::new(entry.path()); + let mut file_stat = FileStat::new_file_filestat_with_path(path, 0); + self.opendal_meta_to_file_stat(entry.metadata(), &mut file_stat); + debug!("read dir file stat: {:?}", file_stat); + Ok(file_stat) + }) + .collect() + } + + async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + let file_stat = self.stat(path).await?; + debug_assert!(file_stat.kind == RegularFile); + + let mut file = OpenedFile::new(file_stat); + let file_name = path.to_string_lossy().to_string(); + if flags.is_read() { + let reader = self + .op + .reader_with(&file_name) + .await + .map_err(opendal_error_to_errno)?; + file.reader = Some(Box::new(FileReaderImpl { reader })); + } + if flags.is_write() { + let writer = self + .op + .writer_with(&file_name) + .await + .map_err(opendal_error_to_errno)?; + file.writer = Some(Box::new(FileWriterImpl { writer })); + } + Ok(file) + } + + async fn open_dir(&self, path: &Path, _flags: OpenFileFlags) -> Result { + let file_stat = self.stat(path).await?; + debug_assert!(file_stat.kind == Directory); + + let opened_file = OpenedFile::new(file_stat); + Ok(opened_file) + } + + async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + self.open_file(path, flags).await + } + + async fn create_dir(&self, path: &Path) -> Result { + let file_name = path.to_string_lossy().to_string(); + self.op + .create_dir(&file_name) + .await + .map_err(opendal_error_to_errno)?; + let file_stat = self.stat(path).await?; + Ok(file_stat) + } + + async fn set_attr(&self, _path: &Path, _file_stat: &FileStat, _flush: bool) -> Result<()> { + // no need to implement + Ok(()) + } + + async fn remove_file(&self, path: &Path) -> Result<()> { + let file_name = path.to_string_lossy().to_string(); + self.op + .remove(vec![file_name]) + .await + .map_err(opendal_error_to_errno) + } + + async fn remove_dir(&self, path: &Path) -> Result<()> { + //todo:: need to consider keeping the behavior of posix remove dir when the dir is not empty + self.remove_file(path).await + } + + fn get_capacity(&self) -> Result { + Ok(FileSystemCapacity {}) + } +} + +struct FileReaderImpl { + reader: opendal::Reader, +} + +#[async_trait] +impl FileReader for FileReaderImpl { + async fn read(&mut self, offset: u64, size: u32) -> Result { + let end = offset + size as u64; + let v = self + .reader + .read(offset..end) + .await + .map_err(opendal_error_to_errno)?; + Ok(v.to_bytes()) + } +} + +struct FileWriterImpl { + writer: opendal::Writer, +} + +#[async_trait] +impl FileWriter for FileWriterImpl { + async fn write(&mut self, _offset: u64, data: &[u8]) -> Result { + self.writer + .write(data.to_vec()) + .await + .map_err(opendal_error_to_errno)?; + Ok(data.len() as u32) + } + + async fn close(&mut self) -> Result<()> { + self.writer.close().await.map_err(opendal_error_to_errno)?; + Ok(()) + } +} + +fn opendal_error_to_errno(err: opendal::Error) -> fuse3::Errno { + debug!("opendal_error2errno: {:?}", err); + match err.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::IsADirectory => Errno::from(libc::EISDIR), + ErrorKind::NotFound => Errno::from(libc::ENOENT), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + ErrorKind::AlreadyExists => Errno::from(libc::EEXIST), + ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR), + ErrorKind::RateLimited => Errno::from(libc::EBUSY), + _ => Errno::from(libc::ENOENT), + } +} + +fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType { + match mode { + EntryMode::DIR => Directory, + _ => RegularFile, + } +} + +#[cfg(test)] +mod test { + use opendal::layers::LoggingLayer; + use opendal::{services, Operator}; + + async fn test_s3_stat() { + let mut builder = services::S3::default(); + builder + .access_key_id("") // Replace with your AWS access key + .secret_access_key("") // Replace with your AWS secret key + .region(""); // Replace with your S3 bucket name + + // Init an operator + let op = Operator::new(builder) + .expect("opendal create failed") + .layer(LoggingLayer::default()) + .finish(); + + let meta = op.stat_with("s1/fileset1/"); + println!("{:?}", meta.await); + } +} diff --git a/clients/filesystem-fuse/src/opened_file.rs b/clients/filesystem-fuse/src/opened_file.rs index 5bc961c9a6b..0c630e07217 100644 --- a/clients/filesystem-fuse/src/opened_file.rs +++ b/clients/filesystem-fuse/src/opened_file.rs @@ -122,6 +122,32 @@ pub(crate) struct FileHandle { // OpenFileFlags is the open file flags for the file system. pub(crate) struct OpenFileFlags(pub(crate) u32); +impl OpenFileFlags { + pub fn is_read(&self) -> bool { + (self.0 & libc::O_WRONLY as u32) == 0 + } + + pub fn is_write(&self) -> bool { + (self.0 & libc::O_WRONLY as u32) != 0 || (self.0 & libc::O_RDWR as u32) != 0 + } + + pub fn is_append(&self) -> bool { + (self.0 & libc::O_APPEND as u32) != 0 + } + + pub fn is_create(&self) -> bool { + (self.0 & libc::O_CREAT as u32) != 0 + } + + pub fn is_truncate(&self) -> bool { + (self.0 & libc::O_TRUNC as u32) != 0 + } + + pub fn is_exclusive(&self) -> bool { + (self.0 & libc::O_EXCL as u32) != 0 + } +} + #[cfg(test)] mod tests { use super::*;