Skip to content

Commit

Permalink
Support s3
Browse files Browse the repository at this point in the history
  • Loading branch information
diqiu50 committed Dec 26, 2024
1 parent 0eecbd8 commit f78adee
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 14 deletions.
1 change: 1 addition & 0 deletions clients/filesystem-fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ futures-util = "0.3.30"
libc = "0.2.168"
log = "0.4.22"
once_cell = "1.20.2"
opendal = { version = "0.46.0", features = ["services-s3"] }
reqwest = { version = "0.12.9", features = ["json"] }
serde = { version = "1.0.216", features = ["derive"] }
tokio = { version = "1.38.0", features = ["full"] }
Expand Down
31 changes: 17 additions & 14 deletions clients/filesystem-fuse/src/gvfs_fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
use crate::config::AppConfig;
use crate::default_raw_filesystem::DefaultRawFileSystem;
use crate::error::ErrorCode::{InvalidConfig, UnSupportedFilesystem};
use crate::filesystem::FileSystemContext;
use crate::filesystem::{FileSystemContext, PathFileSystem};
use crate::fuse_api_handle::FuseApiHandle;
use crate::fuse_server::FuseServer;
use crate::gravitino_client::GravitinoClient;
use crate::gvfs_fileset_fs::GvfsFilesetFs;
use crate::memory_filesystem::MemoryFileSystem;
use crate::open_dal_filesystem::OpenDalFileSystem;
use crate::utils::GvfsResult;
use log::info;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -188,22 +189,24 @@ pub async fn create_gvfs_filesystem(
.get_fileset(&catalog, &schema, &fileset)
.await?
.storage_location;
let (_schema, location) = extract_storage_filesystem(&location).unwrap();

// todo need to replace the inner filesystem with the real storage filesystem
let inner_fs = MemoryFileSystem::new().await;

let fs = GvfsFilesetFs::new(
Box::new(inner_fs),
Path::new(&location),
client,
config,
fs_context,
)
.await;
let (schema, location) = extract_storage_filesystem(&location).unwrap();

let inner_fs = create_fs_by_schema(&schema, config, fs_context)?;

let fs = GvfsFilesetFs::new(inner_fs, Path::new(&location), client, config, fs_context).await;
Ok(CreateFsResult::Gvfs(fs))
}

fn create_fs_by_schema(
schema: &FileSystemSchema,
config: &AppConfig,
fs_context: &FileSystemContext,
) -> GvfsResult<Box<dyn PathFileSystem>> {
match schema {
FileSystemSchema::S3 => OpenDalFileSystem::create_file_system(schema, config, fs_context),
}
}

pub fn extract_fileset(path: &str) -> GvfsResult<(String, String, String)> {
if !path.starts_with(FILESET_PREFIX) {
return Err(InvalidConfig.to_error("Invalid fileset path".to_string()));
Expand Down
1 change: 1 addition & 0 deletions clients/filesystem-fuse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod gravitino_client;
mod gvfs_fileset_fs;
mod gvfs_fuse;
mod memory_filesystem;
mod open_dal_filesystem;
mod opened_file;
mod opened_file_manager;
mod utils;
Expand Down
265 changes: 265 additions & 0 deletions clients/filesystem-fuse/src/open_dal_filesystem.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::config::AppConfig;
use crate::filesystem::{
FileReader, FileStat, FileSystemCapacity, FileSystemContext, FileWriter, PathFileSystem, Result,
};
use crate::gvfs_fuse::FileSystemSchema;
use crate::opened_file::{OpenFileFlags, OpenedFile};
use crate::utils::GvfsResult;
use async_trait::async_trait;
use bytes::Bytes;
use fuse3::FileType::{Directory, RegularFile};
use fuse3::{Errno, FileType, Timestamp};
use log::debug;
use opendal::layers::LoggingLayer;
use opendal::services::S3;
use opendal::{Builder, EntryMode, ErrorKind, Metadata, Operator};
use std::path::Path;
use std::time::SystemTime;

pub(crate) struct OpenDalFileSystem {
op: Operator,
}

impl OpenDalFileSystem {}

impl OpenDalFileSystem {
fn new(op: Operator, _config: &AppConfig, _fs_context: &FileSystemContext) -> Self {
Self { op: op }
}

pub(crate) fn create_file_system(
schema: &FileSystemSchema,
config: &AppConfig,
fs_context: &FileSystemContext,
) -> GvfsResult<Box<dyn PathFileSystem>> {
match schema {
FileSystemSchema::S3 => {
let builder = S3::from_map(config.extent_config.clone());
let op = Operator::new(builder)
.expect("opendal create failed")
.layer(LoggingLayer::default())
.finish();
Ok(Box::new(OpenDalFileSystem::new(op, config, fs_context)))
}
}
}

fn opendal_meta_to_file_stat(&self, meta: &Metadata, file_stat: &mut FileStat) {
let now = SystemTime::now();
let mtime = meta.last_modified().map(|x| x.into()).unwrap_or(now);

file_stat.size = meta.content_length();
file_stat.kind = opendal_filemode_to_filetype(meta.mode());
file_stat.ctime = Timestamp::from(mtime);
file_stat.atime = Timestamp::from(now);
file_stat.mtime = Timestamp::from(mtime);
}
}

#[async_trait]
impl PathFileSystem for OpenDalFileSystem {
async fn init(&self) -> Result<()> {
Ok(())
}

async fn stat(&self, path: &Path) -> Result<FileStat> {
let file_name = path.to_string_lossy().to_string();
let meta = self
.op
.stat_with(&file_name)
.await
.map_err(opendal_error_to_errno)?;

let mut file_stat = FileStat::new_file_filestat_with_path(path, 0);
self.opendal_meta_to_file_stat(&meta, &mut file_stat);
Ok(file_stat)
}

async fn read_dir(&self, path: &Path) -> Result<Vec<FileStat>> {
let file_name = path.to_string_lossy().to_string();
let entries = self
.op
.list(&file_name)
.await
.map_err(opendal_error_to_errno)?;
entries
.iter()
.map(|entry| {
let path = Path::new(entry.path());
let mut file_stat = FileStat::new_file_filestat_with_path(path, 0);
self.opendal_meta_to_file_stat(entry.metadata(), &mut file_stat);
debug!("read dir file stat: {:?}", file_stat);
Ok(file_stat)
})
.collect()
}

async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let file_stat = self.stat(path).await?;
debug_assert!(file_stat.kind == RegularFile);

let mut file = OpenedFile::new(file_stat);
let file_name = path.to_string_lossy().to_string();
if flags.is_read() {
let reader = self
.op
.reader_with(&file_name)
.await
.map_err(opendal_error_to_errno)?;
file.reader = Some(Box::new(FileReaderImpl { reader }));
}
if flags.is_write() {
let writer = self
.op
.writer_with(&file_name)
.await
.map_err(opendal_error_to_errno)?;
file.writer = Some(Box::new(FileWriterImpl { writer }));
}
Ok(file)
}

async fn open_dir(&self, path: &Path, _flags: OpenFileFlags) -> Result<OpenedFile> {
let file_stat = self.stat(path).await?;
debug_assert!(file_stat.kind == Directory);

let opened_file = OpenedFile::new(file_stat);
Ok(opened_file)
}

async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
self.open_file(path, flags).await
}

async fn create_dir(&self, path: &Path) -> Result<FileStat> {
let file_name = path.to_string_lossy().to_string();
self.op
.create_dir(&file_name)
.await
.map_err(opendal_error_to_errno)?;
let file_stat = self.stat(path).await?;
Ok(file_stat)
}

async fn set_attr(&self, _path: &Path, _file_stat: &FileStat, _flush: bool) -> Result<()> {
// no need to implement
Ok(())
}

async fn remove_file(&self, path: &Path) -> Result<()> {
let file_name = path.to_string_lossy().to_string();
self.op
.remove(vec![file_name])
.await
.map_err(opendal_error_to_errno)
}

async fn remove_dir(&self, path: &Path) -> Result<()> {
//todo:: need to consider keeping the behavior of posix remove dir when the dir is not empty
self.remove_file(path).await
}

fn get_capacity(&self) -> Result<FileSystemCapacity> {
Ok(FileSystemCapacity {})
}
}

struct FileReaderImpl {
reader: opendal::Reader,
}

#[async_trait]
impl FileReader for FileReaderImpl {
async fn read(&mut self, offset: u64, size: u32) -> Result<Bytes> {
let end = offset + size as u64;
let v = self
.reader
.read(offset..end)
.await
.map_err(opendal_error_to_errno)?;
Ok(v.to_bytes())
}
}

struct FileWriterImpl {
writer: opendal::Writer,
}

#[async_trait]
impl FileWriter for FileWriterImpl {
async fn write(&mut self, _offset: u64, data: &[u8]) -> Result<u32> {
self.writer
.write(data.to_vec())
.await
.map_err(opendal_error_to_errno)?;
Ok(data.len() as u32)
}

async fn close(&mut self) -> Result<()> {
self.writer.close().await.map_err(opendal_error_to_errno)?;
Ok(())
}
}

fn opendal_error_to_errno(err: opendal::Error) -> fuse3::Errno {
debug!("opendal_error2errno: {:?}", err);
match err.kind() {
ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP),
ErrorKind::IsADirectory => Errno::from(libc::EISDIR),
ErrorKind::NotFound => Errno::from(libc::ENOENT),
ErrorKind::PermissionDenied => Errno::from(libc::EACCES),
ErrorKind::AlreadyExists => Errno::from(libc::EEXIST),
ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR),
ErrorKind::RateLimited => Errno::from(libc::EBUSY),
_ => Errno::from(libc::ENOENT),
}
}

fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType {
match mode {
EntryMode::DIR => Directory,
_ => RegularFile,
}
}

#[cfg(test)]
mod test {
use opendal::layers::LoggingLayer;
use opendal::{services, Operator};

async fn test_s3_stat() {
let mut builder = services::S3::default();
builder
.access_key_id("<Your AWS Access Key>") // Replace with your AWS access key
.secret_access_key("<Your AWS Secret Key>") // Replace with your AWS secret key
.region("<Your AWS region") // Replace with your S3 bucket region
.bucket("<Your S3 bucket name>"); // Replace with your S3 bucket name

// Init an operator
let op = Operator::new(builder)
.expect("opendal create failed")
.layer(LoggingLayer::default())
.finish();

let meta = op.stat_with("s1/fileset1/");
println!("{:?}", meta.await);
}
}
26 changes: 26 additions & 0 deletions clients/filesystem-fuse/src/opened_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,32 @@ pub(crate) struct FileHandle {
// OpenFileFlags is the open file flags for the file system.
pub(crate) struct OpenFileFlags(pub(crate) u32);

impl OpenFileFlags {
pub fn is_read(&self) -> bool {
(self.0 & libc::O_WRONLY as u32) == 0
}

pub fn is_write(&self) -> bool {
(self.0 & libc::O_WRONLY as u32) != 0 || (self.0 & libc::O_RDWR as u32) != 0
}

pub fn is_append(&self) -> bool {
(self.0 & libc::O_APPEND as u32) != 0
}

pub fn is_create(&self) -> bool {
(self.0 & libc::O_CREAT as u32) != 0
}

pub fn is_truncate(&self) -> bool {
(self.0 & libc::O_TRUNC as u32) != 0
}

pub fn is_exclusive(&self) -> bool {
(self.0 & libc::O_EXCL as u32) != 0
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit f78adee

Please sign in to comment.