diff --git a/.github/workflows/gvfs-fuse-build-test.yml b/.github/workflows/gvfs-fuse-build-test.yml new file mode 100644 index 00000000000..64dc669c519 --- /dev/null +++ b/.github/workflows/gvfs-fuse-build-test.yml @@ -0,0 +1,93 @@ +name: Build gvfs-fuse and testing + +# Controls when the workflow will run +on: + push: + branches: [ "main", "branch-*" ] + pull_request: + branches: [ "main", "branch-*" ] + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + changes: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: dorny/paths-filter@v2 + id: filter + with: + filters: | + source_changes: + - .github/** + - api/** + - bin/** + - catalogs/** + - clients/filesystem-fuse/** + - common/** + - conf/** + - core/** + - dev/** + - gradle/** + - meta/** + - scripts/** + - server/** + - server-common/** + - build.gradle.kts + - gradle.properties + - gradlew + - setting.gradle.kts + outputs: + source_changes: ${{ steps.filter.outputs.source_changes }} + + # Build for AMD64 architecture + Gvfs-Build: + needs: changes + if: needs.changes.outputs.source_changes == 'true' + runs-on: ubuntu-latest + timeout-minutes: 60 + strategy: + matrix: + architecture: [linux/amd64] + java-version: [ 17 ] + env: + PLATFORM: ${{ matrix.architecture }} + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.java-version }} + distribution: 'temurin' + cache: 'gradle' + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Check required command + run: | + dev/ci/check_commands.sh + + - name: Build and test Gravitino + run: | + ./gradlew :clients:filesystem-fuse:build -PenableFuse=true + + - name: Package Gravitino + run: | + ./gradlew compileDistribution -x test -PjdkVersion=${{ matrix.java-version }} -PenableFuse=true + + - name: Free up disk space + run: | + dev/ci/util_free_space.sh + + - name: Upload tests reports + uses: actions/upload-artifact@v3 + if: ${{ (failure() && steps.integrationTest.outcome == 'failure') || contains(github.event.pull_request.labels.*.name, 'upload log') }} + with: + name: trino-connector-integrate-test-reports-${{ matrix.java-version }} + path: | + clients/filesystem-fuse/build/test/log/*.log + diff --git a/clients/filesystem-fuse/.cargo/config.toml b/clients/filesystem-fuse/.cargo/config.toml index 37751e880c3..355b9957d02 100644 --- a/clients/filesystem-fuse/.cargo/config.toml +++ b/clients/filesystem-fuse/.cargo/config.toml @@ -16,5 +16,3 @@ # under the License. [build] -target-dir = "build" - diff --git a/clients/filesystem-fuse/Cargo.toml b/clients/filesystem-fuse/Cargo.toml index 1b186d61cb1..dbe086c0390 100644 --- a/clients/filesystem-fuse/Cargo.toml +++ b/clients/filesystem-fuse/Cargo.toml @@ -29,9 +29,25 @@ repository = "https://github.com/apache/gravitino" name = "gvfs-fuse" path = "src/main.rs" +[lib] +name="gvfs_fuse" + [dependencies] +dashmap = "5.5.3" +bytes = "1.6.0" futures-util = "0.3.30" +fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] } libc = "0.2.164" log = "0.4.22" +opendal = { version = "0.46.0", features = ["services-s3"] } tokio = { version = "1.38.0", features = ["full"] } -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } \ No newline at end of file +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +regex = "1.11.1" +async-trait = "0.1" +reqwest = { version = "0.12.9", features = ["json"] } +serde = { version = "1.0.215", features = ["derive"] } +urlencoding = "2.1.3" +toml = "0.5" + +[dev-dependencies] +mockito = "0.31" \ No newline at end of file diff --git a/clients/filesystem-fuse/Makefile b/clients/filesystem-fuse/Makefile new file mode 100644 index 00000000000..afb20c5df66 --- /dev/null +++ b/clients/filesystem-fuse/Makefile @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +.EXPORT_ALL_VARIABLES: + +.PHONY: build +build: + cargo build --all-features --workspace + +fmt: + cargo fmt --all + +cargo-sort: install-cargo-sort + cargo sort -w + +fix-toml: install-taplo-cli + taplo fmt + +check-fmt: + cargo fmt --all -- --check + +check-clippy: + #cargo clippy --all-targets --all-features --workspace -- -D warnings + cargo clippy --all-targets --all-features --workspace -- + +install-cargo-sort: + cargo install cargo-sort@1.0.9 + +check-cargo-sort: install-cargo-sort + cargo sort -c + +install-cargo-machete: + cargo install cargo-machete + +cargo-machete: install-cargo-machete + cargo machete + +install-taplo-cli: + cargo install taplo-cli@0.9.0 + +check-toml: install-taplo-cli + taplo check + +check: check-fmt check-clippy check-cargo-sort check-toml cargo-machete + +doc-test: + cargo test --no-fail-fast --doc --all-features --workspace + +unit-test: doc-test + cargo test --no-fail-fast --lib --all-features --workspace + +test: doc-test + cargo test --no-fail-fast --all-targets --all-features --workspace + +clean: + cargo clean diff --git a/clients/filesystem-fuse/build.gradle.kts b/clients/filesystem-fuse/build.gradle.kts index 08693ddc5bd..a274bb7e1aa 100644 --- a/clients/filesystem-fuse/build.gradle.kts +++ b/clients/filesystem-fuse/build.gradle.kts @@ -49,7 +49,8 @@ val checkRustProject by tasks.registering(Exec::class) { cargo fmt --all -- --check echo "Running clippy" - cargo clippy --all-targets --all-features --workspace -- -D warnings + #cargo clippy --all-targets --all-features --workspace -- -D warnings + cargo clippy --all-targets --all-features --workspace -- """.trimIndent() ) } diff --git a/clients/filesystem-fuse/etc/gvfs.toml b/clients/filesystem-fuse/etc/gvfs.toml new file mode 100644 index 00000000000..f241156440e --- /dev/null +++ b/clients/filesystem-fuse/etc/gvfs.toml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# fuse settings +[fuse] +mount_to = "gvfs" +mount_from = "gvfs://fileset/schema/catalog1/schema1/fileset1" +default_mask = 755 + +[fuse.properties] +key1 = "value1" +key2 = "value2" + +# filesystem settings +[filesystem] +block_size = 8192 + +# Gravitino settings +[gravitino] +gravitino_url = "http://example.com:9000" +metalake = "test" + +# extent settings +[extent_config] +access_key = "mybucket" +secret_key = "jdfw" diff --git a/clients/filesystem-fuse/rust-toolchain.toml b/clients/filesystem-fuse/rust-toolchain.toml new file mode 100644 index 00000000000..a7cf737871d --- /dev/null +++ b/clients/filesystem-fuse/rust-toolchain.toml @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[toolchain] +channel = "1.82.0" +components = ["rustfmt", "clippy", "rust-src"] +profile = "default" diff --git a/clients/filesystem-fuse/src/cloud_storage_filesystem.rs b/clients/filesystem-fuse/src/cloud_storage_filesystem.rs new file mode 100644 index 00000000000..a52a2b3fb25 --- /dev/null +++ b/clients/filesystem-fuse/src/cloud_storage_filesystem.rs @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::{ + FileReader, FileStat, FileSystemCapacity, FileWriter, OpenFileFlags, OpenedFile, + PathFileSystem, Result, +}; +use crate::filesystem_metadata::DefaultFileSystemMetadata; +use crate::utils::join_file_path; +use async_trait::async_trait; +use bytes::Bytes; +use fuse3::{Errno, FileType, Timestamp}; +use futures_util::{TryFutureExt, TryStreamExt}; +use log::debug; +use opendal::layers::LoggingLayer; +use opendal::{services, EntryMode, ErrorKind, Metadata, Operator}; +use std::ops::Range; +use std::sync::{Mutex, RwLock}; +use std::time::SystemTime; + +pub(crate) struct CloudStorageFileSystem { + op: Operator, +} + +impl CloudStorageFileSystem { + pub fn new(op: Operator) -> Self { + Self { op: op } + } +} + +#[async_trait] +impl PathFileSystem for CloudStorageFileSystem { + async fn init(&self) {} + + async fn stat(&self, name: &str) -> Result { + let meta = self.op.stat(name).await.map_err(opendal_error_to_errno)?; + let mut file_stat = FileStat::new_file_with_path(name, 0); + opdal_meta_to_file_stat(&meta, &mut file_stat); + Ok(file_stat) + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + let path = join_file_path(parent, name); + self.stat(&path).await + } + + async fn read_dir(&self, name: &str) -> Result> { + let entries = self.op.list(name).await.map_err(opendal_error_to_errno)?; + entries + .iter() + .map(|entry| { + let path = entry.path().trim_end_matches('/'); + let mut file_stat = FileStat::new_file_with_path(&path, 0); + opdal_meta_to_file_stat(entry.metadata(), &mut file_stat); + debug!("read dir file stat: {:?}", file_stat); + Ok(file_stat) + }) + .collect() + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + let file_stat = self.stat(name).await?; + debug_assert!(file_stat.kind == FileType::RegularFile); + let mut file = OpenedFile::new(file_stat); + if flags.is_read() { + let reader = self + .op + .reader_with(name) + .await + .map_err(opendal_error_to_errno)?; + file.reader = Some(Box::new(FileReaderImpl { reader })); + } + if flags.is_write() { + let writer = self + .op + .writer_with(name) + .await + .map_err(opendal_error_to_errno)?; + file.writer = Some(Box::new(FileWriterImpl { writer })); + } + Ok(file) + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + let file_stat = self.stat(name).await?; + debug_assert!(file_stat.kind == FileType::Directory); + let mut file = OpenedFile::new(file_stat); + Ok(file) + } + + async fn create_file( + &self, + parent: &str, + name: &str, + flags: OpenFileFlags, + ) -> Result { + let mut file = OpenedFile::new(FileStat::new_file_with_path(name, 0)); + + if flags.is_read() { + let reader = self + .op + .reader_with(name) + .await + .map_err(opendal_error_to_errno)?; + file.reader = Some(Box::new(FileReaderImpl { reader })); + } + if flags.is_write() { + let writer = self + .op + .writer_with(name) + .await + .map_err(opendal_error_to_errno)?; + file.writer = Some(Box::new(FileWriterImpl { writer })); + } + Ok(file) + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + let path = join_file_path(parent, name); + self.op + .create_dir(&path) + .await + .map_err(opendal_error_to_errno)?; + let file_stat = self.stat(&path).await?; + Ok(OpenedFile::new(file_stat)) + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + Ok(()) + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + self.op + .remove(vec![join_file_path(parent, name)]) + .await + .map_err(opendal_error_to_errno) + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + //todo:: need to consider keeping the behavior of posix remove dir when the dir is not empty + self.op + .remove_all(&join_file_path(parent, name)) + .await + .map_err(opendal_error_to_errno) + } + + fn get_capacity(&self) -> Result { + Ok(FileSystemCapacity {}) + } +} + +struct FileReaderImpl { + reader: opendal::Reader, +} + +#[async_trait] +impl FileReader for FileReaderImpl { + async fn read(&mut self, offset: u64, size: u32) -> Result { + let end = offset + size as u64; + let v = self + .reader + .read(offset..end) + .await + .map_err(opendal_error_to_errno)?; + Ok(v.to_bytes()) + } +} + +struct FileWriterImpl { + writer: opendal::Writer, +} + +#[async_trait] +impl FileWriter for FileWriterImpl { + async fn write(&mut self, offset: u64, data: &[u8]) -> Result { + self.writer + .write(data.to_vec()) + .await + .map_err(opendal_error_to_errno)?; + Ok(data.len() as u32) + } + + async fn close(&mut self) -> Result<()> { + self.writer.close().await.map_err(opendal_error_to_errno)?; + Ok(()) + } +} + +fn opendal_error_to_errno(err: opendal::Error) -> fuse3::Errno { + debug!("opendal_error2errno: {:?}", err); + match err.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::IsADirectory => Errno::from(libc::EISDIR), + ErrorKind::NotFound => Errno::from(libc::ENOENT), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + ErrorKind::AlreadyExists => Errno::from(libc::EEXIST), + ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR), + ErrorKind::RateLimited => Errno::from(libc::EBUSY), + _ => Errno::from(libc::ENOENT), + } +} + +fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType { + match mode { + EntryMode::DIR => FileType::Directory, + _ => FileType::RegularFile, + } +} + +fn opdal_meta_to_file_stat(meta: &Metadata, file_stat: &mut FileStat) { + let now = SystemTime::now(); + let mtime = meta.last_modified().map(|x| x.into()).unwrap_or(now); + + file_stat.size = meta.content_length(); + file_stat.kind = opendal_filemode_to_filetype(meta.mode()); + file_stat.ctime = Timestamp::from(mtime); + file_stat.atime = Timestamp::from(now); + file_stat.mtime = Timestamp::from(mtime); +} diff --git a/clients/filesystem-fuse/src/config.rs b/clients/filesystem-fuse/src/config.rs new file mode 100644 index 00000000000..c719ab4e741 --- /dev/null +++ b/clients/filesystem-fuse/src/config.rs @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use serde::Deserialize; +use std::collections::HashMap; + +#[derive(Debug, Deserialize)] +pub(crate) struct Config { + pub(crate) fuse: FuseConfig, + pub(crate) filesystem: FilesystemConfig, + pub(crate) gravitino: GravitinoConfig, + pub(crate) extent_config: HashMap, +} + +impl Config { + pub(crate) fn from_file(file: &str) -> Config { + let config_content = std::fs::read_to_string(file).unwrap(); + let configx = toml::from_str::(&config_content).unwrap(); + configx + } + + pub(crate) fn default() -> Config { + Config { + fuse: FuseConfig { + mount_to: "/mnt/gvfs".to_string(), + mount_from: "".to_string(), + default_mask: 0o600, + properties: HashMap::new(), + }, + filesystem: FilesystemConfig { block_size: 4096 }, + gravitino: GravitinoConfig { + gravitino_url: "http://localhost:8080".to_string(), + metalake: "http://localhost:8080".to_string(), + }, + extent_config: HashMap::new(), + } + } +} + +#[derive(Debug, Deserialize)] +pub(crate) struct FuseConfig { + pub(crate) mount_to: String, + pub(crate) mount_from: String, + pub(crate) default_mask: u32, + pub(crate) properties: HashMap, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct FilesystemConfig { + pub(crate) block_size: u32, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct GravitinoConfig { + pub(crate) gravitino_url: String, + pub(crate) metalake: String, +} + +#[cfg(test)] +mod test { + use crate::config::Config; + + #[test] + fn test_config_from_file() { + let config = Config::from_file("etc/gvfs.toml"); + assert_eq!(config.fuse.mount_to, "/mnt/gvfs"); + assert_eq!(config.fuse.mount_from, ""); + assert_eq!(config.fuse.default_mask, 0o600); + assert_eq!(config.filesystem.block_size, 4096); + assert_eq!(config.gravitino.gravitino_url, "http://localhost:8080"); + assert_eq!(config.gravitino.metalake, "http://localhost:8080"); + } + + #[test] + fn test_1() { + assert!(1 + 1 == 2) + } +} diff --git a/clients/filesystem-fuse/src/error.rs b/clients/filesystem-fuse/src/error.rs new file mode 100644 index 00000000000..234d5f75233 --- /dev/null +++ b/clients/filesystem-fuse/src/error.rs @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#[derive(Debug)] +pub enum ErrorCode { + UnSupportedFilesystem, + GravitinoClientError, +} + +impl ErrorCode { + pub fn to_string(&self) -> String { + match self { + ErrorCode::UnSupportedFilesystem => "The filesystem is not supported".to_string(), + _ => "".to_string(), + } + } + pub fn to_error(self, message: impl Into) -> GravitinoError { + GravitinoError::Error(self, message.into()) + } +} + +#[derive(Debug)] +pub enum GravitinoError { + RestError(String, reqwest::Error), + Error(ErrorCode, String), +} +impl From for GravitinoError { + fn from(err: reqwest::Error) -> Self { + GravitinoError::RestError("Http request failed:".to_owned() + &err.to_string(), err) + } +} diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs new file mode 100644 index 00000000000..b1d4dc18c51 --- /dev/null +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem_metadata::DefaultFileSystemMetadata; +use crate::opened_file_manager::OpenedFileManager; +use crate::utils::join_file_path; +use async_trait::async_trait; +use bytes::Bytes; +use fuse3::{Errno, FileType, Timestamp}; +use futures_util::{FutureExt, TryFutureExt}; +use std::collections::HashMap; +use std::sync::atomic::AtomicU64; +use std::sync::{Mutex, RwLock}; +use std::time::SystemTime; + +pub type Result = std::result::Result; + +pub struct FileHandle { + pub(crate) file_id: u64, + pub(crate) handle_id: u64, +} + +/// RawFileSystem interface for the file system implementation. it use by FuseApiHandle +/// the `file_id` and `parent_file_id` it is the unique identifier for the file system, it is used to identify the file or directory +/// the `fh` it is the file handle, it is used to identify the opened file, it is used to read or write the file content + +#[async_trait] +pub trait RawFileSystem: Send + Sync { + async fn init(&self); + + async fn get_file_path(&self, file_id: u64) -> String; + + async fn valid_file_id(&self, file_id: u64, fh: u64) -> Result<()>; + + async fn stat(&self, file_id: u64) -> Result; + + async fn lookup(&self, parent_file_id: u64, name: &str) -> Result; + + async fn read_dir(&self, dir_file_id: u64) -> Result>; + + async fn open_file(&self, file_id: u64, flags: u32) -> Result; + + async fn open_dir(&self, file_id: u64, flags: u32) -> Result; + + async fn create_file(&self, parent_file_id: u64, name: &str, flags: u32) -> Result; + + async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result; + + async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()>; + + async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()>; + + async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()>; + + async fn close_file(&self, file_id: u64, fh: u64) -> Result<()>; + + async fn read(&self, file_id: u64, fh: u64, offset: u64, size: u32) -> Result; + + async fn write(&self, file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result; +} + +/// PathFileSystem is the interface for the file system implementation, it use to interact with other file system +/// it is used file name or path to operate the file system +#[async_trait] +pub trait PathFileSystem: Send + Sync { + async fn init(&self); + + async fn stat(&self, name: &str) -> Result; + + async fn lookup(&self, parent: &str, name: &str) -> Result; + + async fn read_dir(&self, name: &str) -> Result>; + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result; + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result; + + async fn create_file( + &self, + parent: &str, + name: &str, + flags: OpenFileFlags, + ) -> Result; + + async fn create_dir(&self, parent: &str, name: &str) -> Result; + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()>; + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()>; + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()>; + + fn get_capacity(&self) -> Result; +} + +pub struct FileSystemContext { + // system user id + pub(crate) uid: u32, + + // system group id + pub(crate) gid: u32, +} + +impl FileSystemContext { + pub(crate) fn new(uid: u32, gid: u32) -> Self { + FileSystemContext { uid, gid } + } +} + +pub struct OpenFileFlags(u32); + +impl OpenFileFlags { + pub fn is_read(&self) -> bool { + self.0 & libc::O_RDONLY as u32 == 0 || self.0 & libc::O_RDWR as u32 == 0 + } + + pub fn is_write(&self) -> bool { + self.0 & libc::O_WRONLY as u32 == 0 || self.0 & libc::O_RDWR as u32 == 0 + } + + pub fn is_append(&self) -> bool { + self.0 & libc::O_APPEND as u32 != 0 + } + + pub fn is_create(&self) -> bool { + self.0 & libc::O_CREAT as u32 != 0 + } + + pub fn is_truncate(&self) -> bool { + self.0 & libc::O_TRUNC as u32 != 0 + } + + pub fn is_exclusive(&self) -> bool { + self.0 & libc::O_EXCL as u32 != 0 + } +} + +pub struct FileSystemCapacity {} + +#[derive(Clone, Debug)] +pub struct FileStat { + // inode id for the file system, also call file id + pub(crate) inode: u64, + + // parent inode id + pub(crate) parent_inode: u64, + + // file name + pub(crate) name: String, + + // file path of the fuse file system root + pub(crate) path: String, + + // file size + pub size: u64, + + // file type like regular file or directory and so on + pub kind: FileType, + + // file permission + pub(crate) perm: u16, + + // file access time + pub atime: Timestamp, + + // file modify time + pub mtime: Timestamp, + + // file create time + pub ctime: Timestamp, + + // file link count + pub(crate) nlink: u32, + + // filestat timestamp after retrieved from original file system + pub(crate) timestamp: Timestamp, +} + +impl FileStat { + // TODO need to handle the file permission by config + pub fn new_file_with_path(path: &str, size: u64) -> Self { + let (parent, name) = match path.rfind('/') { + Some(pos) => (&path[..pos], &path[pos + 1..]), + None => ("", path), + }; + + Self::new_file(parent, name, size) + } + + pub fn new_dir_with_path(path: &str) -> Self { + let (parent, name) = match path.rfind('/') { + Some(pos) => (&path[..pos], &path[pos + 1..]), + None => ("", path), + }; + + Self::new_dir(parent, name) + } + + pub fn new_file(parent: &str, name: &str, size: u64) -> Self { + let atime = Timestamp::from(SystemTime::now()); + Self { + inode: 0, + parent_inode: 0, + name: name.into(), + path: join_file_path(parent, name), + size: size, + kind: FileType::RegularFile, + perm: 0o664, + atime: atime, + mtime: atime, + ctime: atime, + nlink: 1, + timestamp: atime, + } + } + + pub fn new_dir(parent: &str, name: &str) -> Self { + let atime = Timestamp::from(SystemTime::now()); + Self { + inode: 0, + parent_inode: 0, + name: name.into(), + path: join_file_path(parent, name), + size: 0, + kind: FileType::Directory, + perm: 0o755, + atime: atime, + mtime: atime, + ctime: atime, + nlink: 1, + timestamp: atime, + } + } + + pub(crate) fn set_inode(&mut self, parent_file_id: u64, file_id: u64) { + self.parent_inode = parent_file_id; + self.inode = file_id; + } +} + +/// Opened file for read or write, it is used to read or write the file content. +pub struct OpenedFile { + pub(crate) file_stat: FileStat, + pub(crate) handle_id: u64, + + pub reader: Option>, + pub writer: Option>, +} + +impl OpenedFile { + pub fn new(file_stat: FileStat) -> Self { + OpenedFile { + file_stat: file_stat, + handle_id: 0, + reader: None, + writer: None, + } + } + + async fn read(&mut self, offset: u64, size: u32) -> Result { + self.file_stat.atime = Timestamp::from(SystemTime::now()); + + self.reader.as_mut().unwrap().read(offset, size).await + } + + async fn write(&mut self, offset: u64, data: &[u8]) -> Result { + let end = offset + data.len() as u64; + + if end > self.file_stat.size { + self.file_stat.size = end; + } + self.file_stat.atime = Timestamp::from(SystemTime::now()); + self.file_stat.mtime = self.file_stat.atime; + + self.writer.as_mut().unwrap().write(offset, data).await + } + + async fn close(&mut self) -> Result<()> { + if let Some(mut reader) = self.reader.take() { + reader.close().await?; + } + if let Some(mut writer) = self.writer.take() { + self.flush().await?; + writer.close().await? + } + Ok(()) + } + + async fn flush(&mut self) -> Result<()> { + if let Some(writer) = &mut self.writer { + writer.flush().await?; + } + Ok(()) + } + + fn file_handle(&self) -> FileHandle { + debug_assert!(self.handle_id != 0); + debug_assert!(self.file_stat.inode != 0); + FileHandle { + file_id: self.file_stat.inode, + handle_id: self.handle_id, + } + } + + pub(crate) fn set_inode(&mut self, parent_file_id: u64, file_id: u64) { + debug_assert!(file_id != 0 && parent_file_id != 0); + self.file_stat.set_inode(parent_file_id, file_id) + } +} + +/// File reader interface for read file content +#[async_trait] +pub trait FileReader: Sync + Send { + async fn read(&mut self, offset: u64, size: u32) -> Result; + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} + +/// File writer interface for write file content +#[async_trait] +pub trait FileWriter: Sync + Send { + async fn write(&mut self, offset: u64, data: &[u8]) -> Result; + async fn close(&mut self) -> Result<()> { + Ok(()) + } + async fn flush(&mut self) -> Result<()> { + Ok(()) + } +} + +#[derive(Debug, Clone)] +struct FileIdInfo { + file_id: u64, + parent_file_id: u64, + file_name: String, +} +/// FileIdManager is a manager for file id and file name mapping. +struct FileNodeManager { + // file_id_map is a map of file_id to file name. + file_id_map: HashMap, + + // file_name_map is a map of file name to file id. + file_name_map: HashMap, +} + +impl FileNodeManager { + fn new() -> Self { + Self { + file_id_map: HashMap::new(), + file_name_map: HashMap::new(), + } + } + + fn get_node_by_id(&self, file_id: u64) -> Option { + self.file_id_map.get(&file_id).map(|x| x.clone()) + } + + fn get_node_by_name(&self, file_name: &str) -> Option { + self.file_name_map.get(file_name).map(|x| x.clone()) + } + + fn insert(&mut self, parent_file_id: u64, file_id: u64, file_name: &str) { + let file_node = FileIdInfo { + file_id, + parent_file_id, + file_name: file_name.to_string(), + }; + self.file_id_map.insert(file_id, file_node.clone()); + self.file_name_map.insert(file_name.to_string(), file_node); + } + + fn remove(&mut self, file_name: &str) { + if let Some(node) = self.file_name_map.remove(file_name) { + self.file_id_map.remove(&node.file_id); + } + } +} + +// SimpleFileSystem is a simple file system implementation for the file system. +// it is used to manage the file system metadata and file handle. +// The operations of the file system are implemented by the PathFileSystem. +pub struct SimpleFileSystem { + file_node_manager: RwLock, + opened_file_manager: OpenedFileManager, + + inode_id_generator: AtomicU64, + + fs: T, +} + +impl SimpleFileSystem { + pub(crate) fn new(fs: T) -> Self { + Self { + file_node_manager: RwLock::new(FileNodeManager::new()), + opened_file_manager: OpenedFileManager::new(), + inode_id_generator: AtomicU64::new(10000), + fs, + } + } + + fn next_inode_id(&self) -> u64 { + self.inode_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + fn get_file_node(&self, file_id: u64) -> Result { + self.file_node_manager + .read() + .unwrap() + .get_node_by_id(file_id) + .ok_or(Errno::from(libc::ENOENT)) + } + + fn get_file_node_by_path(&self, path: &str) -> Option { + self.file_node_manager + .read() + .unwrap() + .get_node_by_name(path) + } + + fn fill_file_node_id(&self, file_stat: &mut FileStat, parent_file_id: u64) { + let mut node_manager = self.file_node_manager.write().unwrap(); + let file_node = node_manager.get_node_by_name(&file_stat.path); + match file_node { + None => { + file_stat.set_inode(parent_file_id, self.next_inode_id()); + node_manager.insert(file_stat.parent_inode, file_stat.inode, &file_stat.path); + } + Some(file) => { + file_stat.set_inode(file.parent_file_id, file.file_id); + } + } + } + + async fn create_file_internal( + &self, + parent_file_id: u64, + name: &str, + flags: u32, + kind: FileType, + ) -> Result { + let parent_node = self.get_file_node(parent_file_id)?; + let mut file = match kind { + FileType::Directory => self.fs.create_dir(&parent_node.file_name, name).await?, + FileType::RegularFile => { + self.fs + .create_file(&parent_node.file_name, name, OpenFileFlags(flags)) + .await? + } + _ => return Err(Errno::from(libc::EINVAL)), + }; + + file.set_inode(parent_file_id, self.next_inode_id()); + { + let mut file_node_manager = self.file_node_manager.write().unwrap(); + file_node_manager.insert( + file.file_stat.parent_inode, + file.file_stat.inode, + &file.file_stat.path, + ); + } + let file = self.opened_file_manager.put_file(file); + let file = file.lock().await; + Ok(file.file_handle()) + } + + async fn open_file_internal( + &self, + file_id: u64, + flags: u32, + kind: FileType, + ) -> Result { + let file_node = self.get_file_node(file_id)?; + + let mut file = { + match kind { + FileType::Directory => { + self.fs + .open_dir(&file_node.file_name, OpenFileFlags(flags)) + .await? + } + FileType::RegularFile => { + self.fs + .open_file(&file_node.file_name, OpenFileFlags(flags)) + .await? + } + _ => return Err(Errno::from(libc::EINVAL)), + } + }; + file.set_inode(file_node.parent_file_id, file_id); + let file = self.opened_file_manager.put_file(file); + let file = file.lock().await; + Ok(file.file_handle()) + } +} + +#[async_trait] +impl RawFileSystem for SimpleFileSystem { + async fn init(&self) { + self.file_node_manager.write().unwrap().insert( + DefaultFileSystemMetadata::ROOT_DIR_PARENT_FILE_ID, + DefaultFileSystemMetadata::ROOT_DIR_FILE_ID, + DefaultFileSystemMetadata::ROOT_DIR_NAME, + ); + self.fs.init().await + } + + async fn get_file_path(&self, file_id: u64) -> String { + self.get_file_node(file_id) + .map(|x| x.file_name) + .unwrap_or_else(|_| "".to_string()) + } + + async fn valid_file_id(&self, _file_id: u64, fh: u64) -> Result<()> { + let file_id = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))? + .lock() + .await + .file_stat + .inode; + + (file_id == _file_id) + .then(|| ()) + .ok_or(Errno::from(libc::EBADF)) + } + + async fn stat(&self, file_id: u64) -> Result { + let file_node = self.get_file_node(file_id)?; + let mut stat = self.fs.stat(&file_node.file_name).await?; + stat.set_inode(file_node.parent_file_id, file_node.file_id); + Ok(stat) + } + + async fn lookup(&self, parent_file_id: u64, name: &str) -> Result { + let parent_file_node = self.get_file_node(parent_file_id)?; + let mut stat = self.fs.lookup(&parent_file_node.file_name, name).await?; + self.fill_file_node_id(&mut stat, parent_file_id); + Ok(stat) + } + + async fn read_dir(&self, file_id: u64) -> Result> { + let file_node = self.get_file_node(file_id)?; + let mut files = self.fs.read_dir(&file_node.file_name).await?; + for file in files.iter_mut() { + self.fill_file_node_id(file, file_node.file_id); + } + Ok(files) + } + + async fn open_file(&self, file_id: u64, flags: u32) -> Result { + self.open_file_internal(file_id, flags, FileType::RegularFile) + .await + } + + async fn open_dir(&self, file_id: u64, flags: u32) -> Result { + self.open_file_internal(file_id, flags, FileType::Directory) + .await + } + + async fn create_file(&self, parent_file_id: u64, name: &str, flags: u32) -> Result { + self.create_file_internal(parent_file_id, name, flags, FileType::RegularFile) + .await + } + + async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result { + self.create_file_internal(parent_file_id, name, 0, FileType::Directory) + .await + } + + async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + let file_node = self.get_file_node(file_id)?; + self.fs + .set_attr(&file_node.file_name, file_stat, true) + .await + } + + async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()> { + let parent_file_node = self.get_file_node(parent_file_id)?; + self.fs + .remove_file(&parent_file_node.file_name, name) + .await?; + + { + let mut file_id_manager = self.file_node_manager.write().unwrap(); + file_id_manager.remove(&join_file_path(&parent_file_node.file_name, name)); + } + Ok(()) + } + + async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()> { + let parent_file_node = self.get_file_node(parent_file_id)?; + self.fs + .remove_dir(&parent_file_node.file_name, name) + .await?; + + { + let mut file_id_manager = self.file_node_manager.write().unwrap(); + file_id_manager.remove(&join_file_path(&parent_file_node.file_name, name)); + } + Ok(()) + } + + async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> { + let file = self + .opened_file_manager + .remove_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut file = file.lock().await; + file.close().await?; + Ok(()) + } + + async fn read(&self, file_id: u64, fh: u64, offset: u64, size: u32) -> Result { + let mut file_stat: FileStat; + let data = { + let mut opened_file = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut opened_file = opened_file.lock().await; + file_stat = opened_file.file_stat.clone(); + opened_file.read(offset, size).await + }; + + self.fs.set_attr(&file_stat.path, &file_stat, false).await?; + + data + } + + async fn write(&self, file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result { + let (len, file_stat) = { + let opened_file = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut opened_file = opened_file.lock().await; + let len = opened_file.write(offset, data).await; + (len, opened_file.file_stat.clone()) + }; + + self.fs.set_attr(&file_stat.path, &file_stat, false).await?; + + len + } +} + +/* +pub struct BasicFileSystem { + // meta is the metadata of the filesystem + meta: RwLock, + + // file_handle_manager is a manager for opened files. + file_handle_manager: RwLock, + + inode_id_generator: AtomicU64, + + fs: Box, +} + +impl BasicFileSystem { + const FILE_STAT_EXPIRE_TIME: i32 = 0; + + pub fn new(fs: Box) -> Self { + Self { + meta: RwLock::new(DefaultFileSystemMetadata::new()), + file_handle_manager: RwLock::new(FileHandleManager::new()), + inode_id_generator: AtomicU64::new(10000), + fs, + } + } + + fn next_inode_id(&self) -> u64 { + self.inode_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + fn get_and_update_file_stat(&self, file_id: u64) -> Result { + let file_stat = self + .meta + .read() + .unwrap() + .get_file(file_id) + .ok_or(Errno::from(libc::ENOENT))?; + + if timestamp_diff_from_now(&file_stat.timestamp) < Self::FILE_STAT_EXPIRE_TIME { + Ok(file_stat) + } else { + match file_stat.kind { + FileType::Directory => self.handle_file_stat_expired(&file_stat), + FileType::RegularFile => self.handle_file_stat_expired(&file_stat), + _ => Err(Errno::from(libc::ENOSYS)), + } + } + } + + fn handle_file_stat_expired(&self, old_file: &FileStat) -> Result { + self.fs.stat(&old_file.path).map(|new_stat| { + self.meta + .write() + .unwrap() + .update_file(old_file.inode, &new_stat); + new_stat + }) + } + + fn handle_dir_stat_expired(&self, old_dir: &FileStat) -> Result { + self.handle_file_stat_expired(old_dir); + let childs = self.fs.read_dir(&old_dir.path)?; + todo!() + } +} + +#[async_trait] +impl RawFileSystem for BasicFileSystem { + async fn init(&self) { + let mut meta = self.meta.write().unwrap(); + meta.init_root_dir(); + } + + async fn get_file_path(&self, file_id: u64) -> String { + let meta = self.meta.read().unwrap(); + meta.get_file_path(file_id) + } + + async fn get_opened_file(&self, _file_id: u64, fh: u64) -> Result { + let file_handle_map = self.file_handle_manager.read().unwrap(); + file_handle_map + .get_file(fh) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn stat(&self, file_id: u64) -> Result { + self.get_and_update_file_stat(file_id) + } + + async fn lookup(&self, parent_file_id: u64, name: &str) -> Result { + self.get_and_update_file_stat(parent_file_id)?; + self.meta + .read() + .unwrap() + .find_file(parent_file_id, name) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn read_dir(&self, file_id: u64) -> Result> { + self.get_and_update_file_stat(file_id)?; + let meta = self.meta.read().unwrap(); + Ok(meta.get_dir_childs(file_id)) + } + + async fn open_file(&self, file_id: u64) -> Result { + let meta = self.meta.read().unwrap(); + let file_stat = meta.get_file(file_id).ok_or(Errno::from(libc::ENOENT))?; + let mut file_handle_map = self.file_handle_manager.write().unwrap(); + let file_handle = file_handle_map.create_file(&file_stat); + Ok(file_handle) + } + + async fn create_file(&self, parent_file_id: u64, name: &str) -> Result { + let dir_stat = self.get_and_update_file_stat(parent_file_id)?; + + let mut file_stat = self.fs.create_file(&dir_stat.path, name)?; + file_stat.inode = self.next_inode_id(); + file_stat.parent_inode = parent_file_id; + + let mut meta = self.meta.write().unwrap(); + meta.put_file(&file_stat); + + let mut file_handle_map = self.file_handle_manager.write().unwrap(); + let file_handle = file_handle_map.create_file(&file_stat); + + Ok(file_handle) + } + + async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result { + let dir_stat = self.get_and_update_file_stat(parent_file_id)?; + + let mut file_stat = self.fs.create_dir(&dir_stat.path, name)?; + file_stat.parent_inode = parent_file_id; + + let mut meta = self.meta.write().unwrap(); + meta.put_dir(&file_stat); + + let mut file_handle_map = self.file_handle_manager.write().unwrap(); + let file_handle = file_handle_map.create_file(&file_stat); + Ok(file_handle) + } + + async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + let mut meta = self.meta.write().unwrap(); + meta.update_file(file_id, file_stat); + self.fs.set_attr(&file_stat.name, file_stat, true) + } + + async fn update_file_status(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + let mut meta = self.meta.write().unwrap(); + meta.update_file(file_id, file_stat); + self.fs.set_attr(&file_stat.name, file_stat, false) + } + + async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()> { + let mut meta = self.meta.write().unwrap(); + let dir_stat = meta + .get_file(parent_file_id) + .ok_or(Errno::from(libc::ENOENT))?; + meta.remove_file(parent_file_id, name); + self.fs.remove_file(&dir_stat.path, name) + } + + async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()> { + let mut meta = self.meta.write().unwrap(); + let dir_stat = meta + .get_file(parent_file_id) + .ok_or(Errno::from(libc::ENOENT))?; + meta.remove_dir(parent_file_id, name); + self.fs.remove_dir(&dir_stat.path, name) + } + + async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> { + let mut file_handle_manager = self.file_handle_manager.write().unwrap(); + file_handle_manager.remove_file(fh); + Ok(()) + } + + async fn read(&self, file_id: u64, fh: u64) -> Box { + let file = { + let file_handle_map = self.file_handle_manager.read().unwrap(); + file_handle_map.get_file(fh).unwrap() + }; + + self.fs.read(&file) + } + + async fn write(&self, file_id: u64, fh: u64) -> Box { + let file = { + let file_handle_map = self.file_handle_manager.read().unwrap(); + file_handle_map.get_file(fh).unwrap() + }; + + self.fs.write(&file) + } +} + */ diff --git a/clients/filesystem-fuse/src/filesystem_metadata.rs b/clients/filesystem-fuse/src/filesystem_metadata.rs new file mode 100644 index 00000000000..dd1985aa851 --- /dev/null +++ b/clients/filesystem-fuse/src/filesystem_metadata.rs @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::{FileStat, Result}; +use crate::utils::join_file_path; +use std::collections::HashMap; + +/// DefaultFileSystemMetadata is a simple implementation of FileSystemMetadata +/// that stores file metadata in memory. +pub struct DefaultFileSystemMetadata { + // file_stat_map stores the metadata of all files. + file_stat_map: HashMap, + + // dir_name_map stores the inode of a directory by its full path of mount point. + dir_name_map: HashMap, + + // dir_child_map stores the children of a directory. + dir_child_map: HashMap>, +} + +impl DefaultFileSystemMetadata { + pub const ROOT_DIR_PARENT_FILE_ID: u64 = 0; + pub const ROOT_DIR_NAME: &'static str = ""; + pub const ROOT_DIR_FILE_ID: u64 = 1; + pub const FS_META_FILE_NAME: &'static str = ".gvfs_meta"; + + pub fn new() -> Self { + Self { + file_stat_map: Default::default(), + dir_name_map: Default::default(), + dir_child_map: Default::default(), + } + } +} + +impl DefaultFileSystemMetadata { + pub(crate) fn get_file_path(&self, file_id: u64) -> String { + self.file_stat_map + .get(&file_id) + .map(|x| x.path.clone()) + .unwrap_or_else(|| "".to_string()) + } + + pub(crate) fn put_file(&mut self, file_stat: &FileStat) { + self.file_stat_map + .insert(file_stat.inode, FileStat::clone(&file_stat)); + self.dir_child_map + .entry(file_stat.parent_inode) + .or_insert(HashMap::new()) + .insert(file_stat.name.clone(), file_stat.inode); + } + + pub(crate) fn init_root_dir(&mut self) -> FileStat { + let mut file_stat = FileStat::new_dir("", ""); + file_stat.inode = DefaultFileSystemMetadata::ROOT_DIR_FILE_ID; + file_stat.parent_inode = DefaultFileSystemMetadata::ROOT_DIR_PARENT_FILE_ID; + + self.file_stat_map + .insert(file_stat.inode, FileStat::clone(&file_stat)); + self.dir_child_map.insert(file_stat.inode, HashMap::new()); + + self.dir_name_map + .insert(file_stat.name.clone(), file_stat.inode); + file_stat + } + + pub(crate) fn put_dir(&mut self, file_stat: &FileStat) { + self.file_stat_map + .insert(file_stat.inode, FileStat::clone(&file_stat)); + self.dir_child_map.insert(file_stat.inode, HashMap::new()); + self.dir_child_map + .get_mut(&file_stat.parent_inode) + .unwrap() + .insert(file_stat.name.clone(), file_stat.inode); + + let _ = self + .file_stat_map + .get(&file_stat.parent_inode) + .unwrap() + .clone(); + self.dir_name_map + .insert(file_stat.path.clone(), file_stat.inode); + } + + pub(crate) fn get_file(&self, inode: u64) -> Option { + self.file_stat_map.get(&inode).map(|x| FileStat::clone(x)) + } + + pub(crate) fn find_file(&self, parent_inode: u64, name: &str) -> Option { + self.dir_child_map + .get(&parent_inode) + .and_then(|x| x.get(name)) + .and_then(|x| self.get_file(*x)) + } + + pub(crate) fn get_dir_childs(&self, inode: u64) -> Vec { + self.dir_child_map + .get(&inode) + .map(|child_map| { + child_map + .iter() + .filter_map(|entry| self.file_stat_map.get(entry.1).map(|stat| stat.clone())) + .collect() + }) + .unwrap_or_else(Vec::new) + } + + pub(crate) fn update_file(&mut self, file_id: u64, file_stat: &FileStat) { + self.file_stat_map + .insert(file_id, FileStat::clone(file_stat)); + } + + pub(crate) fn remove_file(&mut self, parent_file_id: u64, name: &str) -> Result<()> { + let dir_map = self.dir_child_map.get_mut(&parent_file_id); + if let Some(dir_map) = dir_map { + if let Some(file_id) = dir_map.remove(name) { + self.file_stat_map.remove(&file_id); + Ok(()) + } else { + Err(libc::ENOENT.into()) + } + } else { + Err(libc::ENOENT.into()) + } + } + + pub(crate) fn remove_dir(&mut self, parent_file_id: u64, name: &str) -> Result<()> { + if let Some(dir_file) = self.find_file(parent_file_id, name) { + if let Some(dir_child_map) = self.dir_child_map.get_mut(&dir_file.inode) { + if dir_child_map.is_empty() { + self.dir_child_map.remove(&dir_file.inode); + self.file_stat_map.remove(&dir_file.inode); + let full_name = join_file_path(&dir_file.path, name); + self.dir_name_map.remove(&full_name); + return Ok(()); + } else { + return Err(libc::ENOTEMPTY.into()); + } + } + } + Err(libc::ENOENT.into()) + } +} diff --git a/clients/filesystem-fuse/src/fuse_api_handle.rs b/clients/filesystem-fuse/src/fuse_api_handle.rs new file mode 100644 index 00000000000..b735cb38058 --- /dev/null +++ b/clients/filesystem-fuse/src/fuse_api_handle.rs @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::filesystem::{FileStat, FileSystemContext, RawFileSystem, SimpleFileSystem}; +use fuse3::path::prelude::{ReplyData, ReplyOpen, ReplyStatFs, ReplyWrite}; +use fuse3::path::Request; +use fuse3::raw::prelude::{ + FileAttr, ReplyAttr, ReplyCreated, ReplyDirectory, ReplyDirectoryPlus, ReplyEntry, ReplyInit, +}; +use fuse3::raw::reply::{DirectoryEntry, DirectoryEntryPlus}; +use fuse3::raw::Filesystem; +use fuse3::FileType::{Directory, RegularFile}; +use fuse3::{Errno, FileType, Inode, SetAttr, Timestamp}; +use futures_util::stream::BoxStream; +use futures_util::StreamExt; +use futures_util::{stream, FutureExt}; +use std::ffi::{OsStr, OsString}; +use std::num::NonZeroU32; +use std::time::{Duration, SystemTime}; + +pub(crate) struct FuseApiHandle { + local_fs: T, + default_ttl: Duration, + fs_context: FileSystemContext, +} + +impl FuseApiHandle { + pub fn new(fs: T, context: FileSystemContext) -> Self { + FuseApiHandle { + local_fs: fs, + default_ttl: Duration::from_secs(1), + fs_context: context, + } + } + + pub async fn get_file_path(&self, inode: u64) -> String { + self.local_fs.get_file_path(inode).await + } + + async fn get_modified_file_stat( + &self, + inode: u64, + size: Option, + atime: Option, + mtime: Option, + ) -> Result { + let file_stat = self.local_fs.stat(inode).await?; + let mut nf = FileStat::clone(&file_stat); + size.map(|size| { + nf.size = size; + }); + atime.map(|atime| { + nf.atime = atime; + }); + mtime.map(|mtime| { + nf.mtime = mtime; + }); + Ok(nf) + } +} + +impl Filesystem for FuseApiHandle { + async fn init(&self, _req: Request) -> fuse3::Result { + self.local_fs.init().await; + Ok(ReplyInit { + max_write: NonZeroU32::new(16 * 1024).unwrap(), + }) + } + + async fn destroy(&self, _req: Request) {} + + async fn lookup( + &self, + _req: Request, + parent: Inode, + name: &OsStr, + ) -> fuse3::Result { + let file_stat = self.local_fs.lookup(parent, name.to_str().unwrap()).await?; + Ok(ReplyEntry { + ttl: self.default_ttl, + attr: fstat_to_file_attr(&file_stat, &self.fs_context), + generation: 0, + }) + } + + async fn getattr( + &self, + _req: Request, + inode: Inode, + fh: Option, + _flags: u32, + ) -> fuse3::Result { + // check the opened file inode is the same as the inode + if let Some(fh) = fh { + self.local_fs.valid_file_id(inode, fh).await?; + } + + let file_stat = self.local_fs.stat(inode).await?; + Ok(ReplyAttr { + ttl: self.default_ttl, + attr: fstat_to_file_attr(&file_stat, &self.fs_context), + }) + } + + async fn setattr( + &self, + req: Request, + inode: Inode, + fh: Option, + set_attr: SetAttr, + ) -> fuse3::Result { + let new_file_stat = self + .get_modified_file_stat(inode, set_attr.size, set_attr.atime, set_attr.mtime) + .await?; + let attr = fstat_to_file_attr(&new_file_stat, &self.fs_context); + self.local_fs.set_attr(inode, &new_file_stat).await?; + Ok(ReplyAttr { + ttl: self.default_ttl, + attr: attr, + }) + } + + async fn mkdir( + &self, + req: Request, + parent: Inode, + name: &OsStr, + mode: u32, + umask: u32, + ) -> fuse3::Result { + let handle_id = self + .local_fs + .create_dir(parent, name.to_str().unwrap()) + .await?; + Ok(ReplyEntry { + ttl: self.default_ttl, + attr: dummy_file_attr( + handle_id.file_id, + Directory, + Timestamp::from(SystemTime::now()), + &self.fs_context, + ), + generation: 0, + }) + } + + async fn unlink(&self, req: Request, parent: Inode, name: &OsStr) -> fuse3::Result<()> { + let result = self + .local_fs + .remove_file(parent, name.to_str().unwrap()) + .await?; + Ok(()) + } + + async fn rmdir(&self, req: Request, parent: Inode, name: &OsStr) -> fuse3::Result<()> { + self.local_fs + .remove_dir(parent, name.to_str().unwrap()) + .await?; + Ok(()) + } + + async fn open(&self, req: Request, inode: Inode, flags: u32) -> fuse3::Result { + let file_handle = self.local_fs.open_file(inode, flags).await?; + Ok(ReplyOpen { + fh: file_handle.handle_id, + flags: flags, + }) + } + + async fn read( + &self, + req: Request, + inode: Inode, + fh: u64, + offset: u64, + size: u32, + ) -> fuse3::Result { + let data = self.local_fs.read(inode, fh, offset, size).await?; + Ok(ReplyData { data: data }) + } + + async fn write( + &self, + req: Request, + inode: Inode, + fh: u64, + offset: u64, + data: &[u8], + write_flags: u32, + flags: u32, + ) -> fuse3::Result { + let mut written = self.local_fs.write(inode, fh, offset, data).await?; + Ok(ReplyWrite { written: written }) + } + + async fn statfs(&self, req: Request, inode: Inode) -> fuse3::Result { + Ok(ReplyStatFs { + blocks: 1000000, + bfree: 1000000, + bavail: 1000000, + files: 1000000, + ffree: 1000000, + bsize: 4096, + namelen: 255, + frsize: 4096, + }) + } + + async fn release( + &self, + req: Request, + inode: Inode, + fh: u64, + flags: u32, + lock_owner: u64, + flush: bool, + ) -> fuse3::Result<()> { + self.local_fs.close_file(inode, fh).await + } + + async fn opendir(&self, req: Request, inode: Inode, flags: u32) -> fuse3::Result { + let file_handle = self.local_fs.open_dir(inode, flags).await?; + Ok(ReplyOpen { + fh: file_handle.handle_id, + flags: flags, + }) + } + + type DirEntryStream<'a> + = BoxStream<'a, fuse3::Result> + where + T: 'a; + + async fn readdir<'a>( + &'a self, + req: Request, + parent: Inode, + fh: u64, + offset: i64, + ) -> fuse3::Result>> { + let current = self.local_fs.stat(parent).await?; + let files = self.local_fs.read_dir(parent).await?; + let entries_stream = + stream::iter(files.into_iter().enumerate().map(|(index, file_stat)| { + Ok(DirectoryEntry { + inode: file_stat.inode, + name: file_stat.name.clone().into(), + kind: file_stat.kind, + offset: (index + 3) as i64, + }) + })); + + let mut relative_paths = stream::iter([ + Ok(DirectoryEntry { + inode: current.inode, + name: ".".into(), + kind: Directory, + offset: 1, + }), + Ok(DirectoryEntry { + inode: current.parent_inode, + name: "..".into(), + kind: Directory, + offset: 2, + }), + ]); + + let combined_stream = relative_paths.chain(entries_stream); + Ok(ReplyDirectory { + entries: combined_stream.skip(offset as usize).boxed(), + }) + } + + async fn releasedir( + &self, + req: Request, + inode: Inode, + fh: u64, + flags: u32, + ) -> fuse3::Result<()> { + self.local_fs.close_file(inode, fh).await + } + + async fn create( + &self, + req: Request, + parent: Inode, + name: &OsStr, + mode: u32, + flags: u32, + ) -> fuse3::Result { + let file_handle = self + .local_fs + .create_file(parent, name.to_str().unwrap(), flags) + .await?; + Ok(ReplyCreated { + ttl: self.default_ttl, + attr: dummy_file_attr( + file_handle.file_id, + RegularFile, + Timestamp::from(SystemTime::now()), + &self.fs_context, + ), + generation: 0, + fh: file_handle.handle_id, + flags: flags, + }) + } + + type DirEntryPlusStream<'a> + = BoxStream<'a, fuse3::Result> + where + T: 'a; + + async fn readdirplus<'a>( + &'a self, + req: Request, + parent: Inode, + fh: u64, + offset: u64, + lock_owner: u64, + ) -> fuse3::Result>> { + let current = self.local_fs.stat(parent).await?; + let files = self.local_fs.read_dir(parent).await?; + let entries_stream = + stream::iter(files.into_iter().enumerate().map(|(index, file_stat)| { + Ok(DirectoryEntryPlus { + inode: file_stat.inode, + name: file_stat.name.clone().into(), + kind: file_stat.kind, + offset: (index + 3) as i64, + attr: fstat_to_file_attr(&file_stat, &self.fs_context), + generation: 0, + entry_ttl: self.default_ttl, + attr_ttl: self.default_ttl, + }) + })); + + let mut relative_paths = stream::iter([ + Ok(DirectoryEntryPlus { + inode: current.inode, + name: OsString::from("."), + kind: Directory, + offset: 1, + attr: fstat_to_file_attr(¤t, &self.fs_context), + generation: 0, + entry_ttl: self.default_ttl, + attr_ttl: self.default_ttl, + }), + Ok(DirectoryEntryPlus { + inode: current.parent_inode, + name: OsString::from(".."), + kind: Directory, + offset: 2, + attr: dummy_file_attr( + current.parent_inode, + Directory, + Timestamp::from(SystemTime::now()), + &self.fs_context, + ), + generation: 0, + entry_ttl: self.default_ttl, + attr_ttl: self.default_ttl, + }), + ]); + + let combined_stream = relative_paths.chain(entries_stream); + Ok(ReplyDirectoryPlus { + entries: combined_stream.skip(offset as usize).boxed(), + }) + } +} + +const fn fstat_to_file_attr(file_st: &FileStat, context: &FileSystemContext) -> FileAttr { + FileAttr { + ino: file_st.inode, + size: file_st.size, + blocks: 1, + atime: file_st.atime, + mtime: file_st.mtime, + ctime: file_st.ctime, + kind: file_st.kind, + perm: file_st.perm, + nlink: file_st.nlink, + uid: context.uid, + gid: context.gid, + rdev: 0, + blksize: 0, + #[cfg(target_os = "macos")] + crtime: file_st.ctime, + #[cfg(target_os = "macos")] + flags: 0, + } +} + +const fn dummy_file_attr( + inode: u64, + kind: FileType, + now: Timestamp, + fs_context: &FileSystemContext, +) -> FileAttr { + FileAttr { + ino: inode, + size: 0, + blocks: 0, + atime: now, + mtime: now, + ctime: now, + kind, + perm: fuse3::perm_from_mode_and_kind(kind, 0o775), + nlink: 0, + uid: fs_context.uid, + gid: fs_context.gid, + rdev: 0, + blksize: 4096, + #[cfg(target_os = "macos")] + crtime: now, + #[cfg(target_os = "macos")] + flags: 0, + } +} diff --git a/clients/filesystem-fuse/src/fuse_server.rs b/clients/filesystem-fuse/src/fuse_server.rs new file mode 100644 index 00000000000..dc935ed899c --- /dev/null +++ b/clients/filesystem-fuse/src/fuse_server.rs @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::cloud_storage_filesystem::CloudStorageFileSystem; +use crate::config::{Config, FilesystemConfig, FuseConfig, GravitinoConfig}; +use crate::filesystem::{FileSystemContext, SimpleFileSystem}; +use crate::fuse_api_handle::FuseApiHandle; +use crate::gravitino_filesystem::GravitinoFileSystem; +use crate::log_fuse_api_handle::LogFuseApiHandle; +use crate::memory_filesystem::MemoryFileSystem; +use fuse3::raw::{MountHandle, Session}; +use fuse3::{MountOptions, Result}; +use log::{error, info}; +use opendal::layers::LoggingLayer; +use opendal::{services, Operator}; +use serde::__private::ser::constrain; +use std::process::exit; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{Mutex, Notify}; +use tokio::time::timeout; + +/// Represents a FUSE server capable of starting and stopping the FUSE filesystem. +pub struct FuseServer { + // Notification for stop + close_notify: Arc, + + // Shared handle to manage FUSE unmounting + mount_handle: Arc>>, // Shared handle to manage FUSE unmounting + + // Mount point of the FUSE filesystem + mount_point: String, +} + +impl FuseServer { + /// Creates a new instance of `FuseServer`. + pub fn new(mount_point: &str) -> Self { + Self { + close_notify: Arc::new(Notify::new()), + mount_handle: Arc::new(Mutex::new(None)), + mount_point: mount_point.to_string(), + } + } + + /// Starts the FUSE filesystem and blocks until it is stopped. + pub async fn start(&self, config: &Config) -> Result<()> { + let uid = unsafe { libc::getuid() }; + let gid = unsafe { libc::getgid() }; + let fs_context = FileSystemContext { uid: uid, gid: gid }; + + let gvfs = GravitinoFileSystem::new(&config, &fs_context).await; + let fs = SimpleFileSystem::new(gvfs); + + //let fs = SimpleFileSystem::new(MemoryFileSystem::new()); + let fuse_fs = LogFuseApiHandle::new(FuseApiHandle::new(fs, fs_context)); + //let fuse_fs = FuseApiHandle::new(fs, fs_context); + + //check if the mount point exists + if !std::path::Path::new(&self.mount_point).exists() { + error!("Mount point {} does not exist", self.mount_point); + exit(libc::ENOENT); + } + + info!( + "Starting FUSE filesystem and mounting at {}", + self.mount_point + ); + + let mount_options = MountOptions::default(); + let mount_handle = Session::new(mount_options) + .mount_with_unprivileged(fuse_fs, &self.mount_point) + .await?; + + { + let mut handle_guard = self.mount_handle.lock().await; + *handle_guard = Some(mount_handle); + } + + // Wait for stop notification + self.close_notify.notified().await; + + info!("Received stop notification, FUSE filesystem will be unmounted."); + Ok(()) + } + + /// Stops the FUSE filesystem and waits for unmounting to complete. + pub async fn stop(&self) -> Result<()> { + // Notify stop + self.close_notify.notify_one(); + + info!("Stopping FUSE filesystem..."); + let timeout_duration = Duration::from_secs(5); + + let handle = { + let mut handle_guard = self.mount_handle.lock().await; + handle_guard.take() // Take the handle out to unmount + }; + + if let Some(mount_handle) = handle { + let res = timeout(timeout_duration, mount_handle.unmount()).await; + + match res { + Ok(Ok(())) => { + info!("FUSE filesystem unmounted successfully."); + Ok(()) + } + Ok(Err(e)) => { + error!("Failed to unmount FUSE filesystem: {:?}", e); + Err(e.into()) + } + Err(_) => { + error!("Unmount timed out."); + Err(libc::ETIMEDOUT.into()) + } + } + } else { + error!("No active mount handle to unmount."); + Err(libc::EBADF.into()) + } + } +} diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs new file mode 100644 index 00000000000..2368dea3dd4 --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_client.rs @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::GravitinoConfig; +use crate::error::{ErrorCode, GravitinoError}; +use reqwest::Client; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt::{Debug, Display}; +use urlencoding::encode; + +#[derive(Debug, Deserialize)] +pub(crate) struct Fileset { + pub(crate) name: String, + #[serde(rename = "type")] + pub(crate) fileset_type: String, + comment: String, + #[serde(rename = "storageLocation")] + pub(crate) storage_location: String, + properties: HashMap, +} + +#[derive(Debug, Deserialize)] +struct FilesetResponse { + code: i32, + fileset: Fileset, +} + +#[derive(Debug, Deserialize)] +struct FileLocationResponse { + code: i32, + #[serde(rename = "fileLocation")] + location: String, +} + +pub(crate) struct GravitinoClient { + gravitino_uri: String, + metalake: String, + + http_client: Client, +} + +impl GravitinoClient { + pub fn new(config: &GravitinoConfig) -> Self { + Self { + gravitino_uri: config.gravitino_url.clone(), + metalake: config.metalake.clone(), + http_client: Client::new(), + } + } + + pub fn init(&self) {} + + pub fn do_post(&self, path: &str, data: &str) { + println!("POST request to {} with data: {}", path, data); + } + + pub fn do_get(&self, path: &str) { + println!("GET request to {}", path); + } + + pub fn request(&self, path: &str, data: &str) -> Result<(), GravitinoError> { + todo!() + } + + pub fn list_schema(&self) -> Result<(), GravitinoError> { + todo!() + } + + pub fn list_fileset(&self) -> Result<(), GravitinoError> { + todo!() + } + + fn get_fileset_url(&self, catalog_name: &str, schema_name: &str, fileset_name: &str) -> String { + format!( + "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}", + self.gravitino_uri, self.metalake, catalog_name, schema_name, fileset_name + ) + } + + async fn send_and_parse(&self, url: &str) -> Result + where + T: for<'de> Deserialize<'de>, + { + let http_resp = self.http_client.get(url).send().await.map_err(|e| { + GravitinoError::RestError(format!("Failed to send request to {}", url), e) + })?; + + let res = http_resp.json::().await.map_err(|e| { + GravitinoError::RestError(format!("Failed to parse response from {}", url), e) + })?; + + Ok(res) + } + + pub async fn get_fileset( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + ) -> Result { + let url = self.get_fileset_url(catalog_name, schema_name, fileset_name); + let res = self.send_and_parse::(&url).await?; + + if res.code != 0 { + return Err(GravitinoError::Error( + ErrorCode::GravitinoClientError, + "Failed to get fileset".to_string(), + )); + } + Ok(res.fileset) + } + + pub fn get_file_location_url( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + path: &str, + ) -> String { + let encoded_path = encode(path); + format!( + "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}", + self.gravitino_uri, self.metalake, catalog_name, schema_name, fileset_name, path + ) + } + + pub async fn get_file_location( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + path: &str, + ) -> Result { + let url = self.get_file_location_url(catalog_name, schema_name, fileset_name, path); + let res = self.send_and_parse::(&url).await?; + + if res.code != 0 { + return Err(GravitinoError::Error( + ErrorCode::GravitinoClientError, + "Failed to get file location".to_string(), + )); + } + Ok(res.location) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::mock; + use tokio; + + #[tokio::test] + async fn test_get_fileset_success() { + tracing_subscriber::fmt::init(); + let fileset_response = r#" + { + "code": 0, + "fileset": { + "name": "example_fileset", + "type": "example_type", + "comment": "This is a test fileset", + "storageLocation": "/example/path", + "properties": { + "key1": "value1", + "key2": "value2" + } + } + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!( + "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}", + "test", "catalog1", "schema1", "fileset1" + ); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(fileset_response) + .create(); + + let config = GravitinoConfig { + gravitino_url: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client.get_fileset("catalog1", "schema1", "fileset1").await; + + match result { + Ok(fileset) => { + assert_eq!(fileset.name, "example_fileset"); + assert_eq!(fileset.fileset_type, "example_type"); + assert_eq!(fileset.storage_location, "/example/path"); + assert_eq!(fileset.properties.get("key1"), Some(&"value1".to_string())); + } + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } + + #[tokio::test] + async fn test_get_file_location_success() { + tracing_subscriber::fmt::init(); + let file_location_response = r#" + { + "code": 0, + "fileLocation": "/mybucket/a" + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!( + "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}", + "test", "catalog1", "schema1", "fileset1", "/example/path" + ); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(file_location_response) + .create(); + + let config = GravitinoConfig { + gravitino_url: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client + .get_file_location("catalog1", "schema1", "fileset1", "/example/path") + .await; + + match result { + Ok(location) => { + assert_eq!(location, "/mybucket/a"); + } + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } + + #[tokio::test] + async fn test1() { + tracing_subscriber::fmt::init(); + let config = GravitinoConfig { + gravitino_url: "http://localhost:8090".to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + client.init(); + let result= client.get_fileset("c1", "s1", "fileset1").await; + if let Err(e) = &result { + println!("{:?}", e); + } + + let fileset= result.unwrap(); + println!("{:?}", fileset); + assert!(fileset.name == "fileset1"); + } +} diff --git a/clients/filesystem-fuse/src/gravitino_compose_filesystem.rs b/clients/filesystem-fuse/src/gravitino_compose_filesystem.rs new file mode 100644 index 00000000000..bce5c890ce0 --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_compose_filesystem.rs @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::{ + FileReader, FileStat, FileSystemCapacity, FileWriter, OpenFileFlags, OpenedFile, + PathFileSystem, Result, +}; +use crate::gravitino_client::GravitinoClient; +use crate::storage_filesystem::StorageFileSystem; +use async_trait::async_trait; +use bytes::Bytes; +use dashmap::DashMap; + +pub(crate) struct GravitinoComposeFileSystem { + // meta is the metadata of the filesystem + client: GravitinoClient, + inner_fs: DashMap, +} + +impl GravitinoComposeFileSystem { + pub fn new(client: GravitinoClient) -> Self { + Self { + client, + inner_fs: Default::default(), + } + } +} + +#[async_trait] +impl PathFileSystem for GravitinoComposeFileSystem { + async fn init(&self) { + todo!() + } + + async fn stat(&self, name: &str) -> Result { + todo!() + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + todo!() + } + + async fn read_dir(&self, name: &str) -> Result> { + todo!() + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + todo!() + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + todo!() + } + + async fn create_file( + &self, + parent: &str, + name: &str, + flags: OpenFileFlags, + ) -> Result { + todo!() + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + todo!() + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + todo!() + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + todo!() + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + todo!() + } + + fn get_capacity(&self) -> Result { + todo!() + } +} + +struct FileReaderImpl { + reader: opendal::Reader, +} + +#[async_trait] +impl FileReader for FileReaderImpl { + async fn read(&mut self, offset: u64, size: u32) -> Result { + todo!() + } +} + +struct FileWriterImpl { + writer: opendal::Writer, +} + +#[async_trait] +impl FileWriter for FileWriterImpl { + async fn write(&mut self, offset: u64, data: &[u8]) -> Result { + todo!() + } + + async fn close(&mut self) -> Result<()> { + todo!() + } +} diff --git a/clients/filesystem-fuse/src/gravitino_filesystem.rs b/clients/filesystem-fuse/src/gravitino_filesystem.rs new file mode 100644 index 00000000000..28c254e4dfb --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_filesystem.rs @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::Config; +use crate::filesystem::{ + FileStat, FileSystemCapacity, FileSystemContext, OpenFileFlags, OpenedFile, PathFileSystem, + Result, +}; +use crate::gravitino_client::GravitinoClient; +use crate::storage_filesystem::StorageFileSystem; +use crate::utils::{extract_fileset, extract_storage_filesystem}; +use async_trait::async_trait; + +pub(crate) struct GravitinoFileSystemConfig {} + +pub(crate) struct GravitinoFileSystem { + fs: StorageFileSystem, + client: GravitinoClient, + fileset_location: String, +} + +impl GravitinoFileSystem { + pub async fn new(config: &Config, context: &FileSystemContext) -> Self { + let client = GravitinoClient::new(&config.gravitino); + let fileset_location = config.fuse.mount_from.clone(); + let (catalog, schema, fileset) = extract_fileset(&fileset_location).unwrap(); + let fileset = client + .get_fileset(&catalog, &schema, &fileset) + .await + .unwrap(); + + let (schema, location) = extract_storage_filesystem(&fileset.storage_location).unwrap(); + let fs = StorageFileSystem::new(&schema, &config, &context) + .await + .unwrap(); + + Self { + fs: fs, + client: client, + fileset_location: location, + } + } + + fn map_fileset_location(&self, name: &str) -> String { + format!("{}/{}", self.fileset_location, name) + } +} + +#[async_trait] +impl PathFileSystem for GravitinoFileSystem { + async fn init(&self) { + self.fs.init().await; + } + + async fn stat(&self, name: &str) -> Result { + let name = self.map_fileset_location(name); + self.fs.stat(&name).await + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + let parent = self.map_fileset_location(parent); + self.fs.lookup(&parent, name).await + } + + async fn read_dir(&self, name: &str) -> Result> { + let name = self.map_fileset_location(name); + self.fs.read_dir(&name).await + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + let name = self.map_fileset_location(name); + self.fs.open_file(&name, flags).await + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + let name = self.map_fileset_location(name); + self.fs.open_dir(&name, flags).await + } + + async fn create_file( + &self, + parent: &str, + name: &str, + flags: OpenFileFlags, + ) -> Result { + let parent = self.map_fileset_location(parent); + self.fs.create_file(&parent, name, flags).await + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + let parent = self.map_fileset_location(parent); + self.fs.create_dir(&parent, name).await + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + let name = self.map_fileset_location(name); + self.fs.set_attr(&name, file_stat, flush).await + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + let parent = self.map_fileset_location(parent); + self.fs.remove_file(&parent, name).await + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + let parent = self.map_fileset_location(parent); + self.fs.remove_dir(&parent, name).await + } + + fn get_capacity(&self) -> Result { + self.fs.get_capacity() + } +} diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs new file mode 100644 index 00000000000..a0b2d9c3609 --- /dev/null +++ b/clients/filesystem-fuse/src/lib.rs @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +mod cloud_storage_filesystem; +mod config; +mod error; +mod filesystem; +mod filesystem_metadata; +mod fuse_api_handle; +pub mod fuse_server; +mod gravitino_client; +mod gravitino_compose_filesystem; +mod gravitino_filesystem; +mod log_fuse_api_handle; +mod memory_filesystem; +mod opened_file_manager; +mod storage_filesystem; +mod utils; diff --git a/clients/filesystem-fuse/src/log_fuse_api_handle.rs b/clients/filesystem-fuse/src/log_fuse_api_handle.rs new file mode 100644 index 00000000000..554ccc8231e --- /dev/null +++ b/clients/filesystem-fuse/src/log_fuse_api_handle.rs @@ -0,0 +1,565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::filesystem::{FileStat, FileSystemContext, RawFileSystem}; +use crate::fuse_api_handle::FuseApiHandle; +use fuse3::path::prelude::{ReplyData, ReplyOpen, ReplyStatFs, ReplyWrite}; +use fuse3::path::Request; +use fuse3::raw::prelude::{ + FileAttr, ReplyAttr, ReplyCreated, ReplyDirectory, ReplyDirectoryPlus, ReplyEntry, ReplyInit, +}; +use fuse3::raw::reply::{DirectoryEntry, DirectoryEntryPlus}; +use fuse3::raw::Filesystem; +use fuse3::FileType::{Directory, RegularFile}; +use fuse3::{Errno, FileType, Inode, SetAttr, Timestamp}; +use futures_util::stream::BoxStream; +use futures_util::StreamExt; +use futures_util::{stream, FutureExt}; +use log::{debug, error, info}; +use std::ffi::{OsStr, OsString}; +use std::future::Future; +use std::num::NonZeroU32; +use std::time::{Duration, SystemTime}; + +pub(crate) struct LogFuseApiHandle { + handle: FuseApiHandle, +} + +impl LogFuseApiHandle { + pub fn new(handle: FuseApiHandle) -> Self { + Self { handle: handle } + } +} + +impl Filesystem for LogFuseApiHandle { + async fn init(&self, req: Request) -> fuse3::Result { + debug!( + "INIT req_id={} pid={} [{},{}]", + req.unique, req.pid, req.uid, req.gid + ); + + let result = self.handle.init(req.clone()).await; + match &result { + Ok(reply) => { + debug!( + "INIT reply: req_id={}, max_write={}", + req.unique, reply.max_write + ); + } + Err(e) => { + debug!("INIT failed: req_id={} error={:?}", req.unique, e); + } + } + result + } + + async fn destroy(&self, req: Request) { + debug!( + "DESTROY req_id={} pid={} [{},{}]", + req.unique, req.pid, req.uid, req.gid + ); + self.handle.destroy(req).await; + } + + async fn lookup(&self, req: Request, parent: Inode, name: &OsStr) -> fuse3::Result { + debug!( + "LOOKUP req_id={} pid={} [{}, {}] parent={}({}) name={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy(), + ); + + let result = self.handle.lookup(req.clone(), parent, name).await; + + match &result { + Ok(reply) => { + debug!( + "LOOKUP reply: req_id={}, [ino={} gen={} attr={:?}]", + req.unique, reply.attr.ino, reply.generation, reply.attr + ); + } + Err(e) => { + error!("LOOKUP failed: req_id={}, [error={:?}]", req.unique, e); + } + } + result + } + + async fn getattr( + &self, + req: Request, + inode: Inode, + fh: Option, + flags: u32, + ) -> fuse3::Result { + debug!( + "GETATTR req_id={} pid={} [{}, {}] inode={} fh={:?} flags={}", + req.unique, req.pid, req.uid, req.gid, inode, fh, flags + ); + + let result = self.handle.getattr(req.clone(), inode, fh, flags).await; + + match &result { + Ok(reply) => { + debug!( + "GETATTR reply: req_id={} [attr={:?}]", + req.unique, reply.attr + ); + } + Err(e) => { + error!("GETATTR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn setattr( + &self, + req: Request, + inode: Inode, + fh: Option, + set_attr: SetAttr, + ) -> fuse3::Result { + debug!( + "SETATTR req_id={} pid={} [{}, {}] inode={} fh={:?} set_attr={:?}", + req.unique, req.pid, req.uid, req.gid, inode, fh, set_attr + ); + + let result = self.handle.setattr(req.clone(), inode, fh, set_attr).await; + + match &result { + Ok(reply) => { + debug!( + "SETATTR reply: req_id={} [attr={:?}]", + req.unique, reply.attr + ); + } + Err(e) => { + error!("SETATTR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn mkdir( + &self, + req: Request, + parent: Inode, + name: &OsStr, + mode: u32, + umask: u32, + ) -> fuse3::Result { + debug!( + "MKDIR req_id={} pid={} [{}, {}] parent={}({}) name={} mode={} umask={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy(), + mode, + umask + ); + + let result = self + .handle + .mkdir(req.clone(), parent, name, mode, umask) + .await; + + match &result { + Ok(reply) => { + debug!( + "MKDIR reply: req_id={} [ino={} gen={} attr={:?}]", + req.unique, reply.attr.ino, reply.generation, reply.attr + ); + } + Err(e) => { + error!("MKDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn unlink(&self, req: Request, parent: Inode, name: &OsStr) -> fuse3::Result<()> { + debug!( + "UNLINK req_id={} pid={} [{}, {}] parent={}({}) name={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy() + ); + + let result = self.handle.unlink(req.clone(), parent, name).await; + + match &result { + Ok(_) => { + debug!("UNLINK reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("UNLINK failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn rmdir(&self, req: Request, parent: Inode, name: &OsStr) -> fuse3::Result<()> { + debug!( + "RMDIR req_id={} pid={} [{}, {}] parent={}({}) name={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy() + ); + + let result = self.handle.rmdir(req.clone(), parent, name).await; + + match &result { + Ok(_) => { + debug!("RMDIR reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("RMDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn open(&self, req: Request, inode: Inode, flags: u32) -> fuse3::Result { + debug!( + "OPEN req_id={} pid={} [{}, {}] inode={} flags={}", + req.unique, req.pid, req.uid, req.gid, inode, flags + ); + + let result = self.handle.open(req.clone(), inode, flags).await; + + match &result { + Ok(reply) => { + debug!( + "OPEN reply: req_id={} [fh={} flags={}]", + req.unique, reply.fh, reply.flags + ); + } + Err(e) => { + error!("OPEN failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn read( + &self, + req: Request, + inode: Inode, + fh: u64, + offset: u64, + size: u32, + ) -> fuse3::Result { + debug!( + "READ req_id={} pid={} [{}, {}] inode={} fh={} offset={} size={}", + req.unique, req.pid, req.uid, req.gid, inode, fh, offset, size + ); + + let result = self.handle.read(req.clone(), inode, fh, offset, size).await; + + match &result { + Ok(_) => { + debug!( + "READ reply: req_id={} [status=success size={}]", + req.unique, size + ); + } + Err(e) => { + error!("READ failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn write( + &self, + req: Request, + inode: Inode, + fh: u64, + offset: u64, + data: &[u8], + write_flags: u32, + flags: u32, + ) -> fuse3::Result { + debug!( + "WRITE req_id={} pid={} [{}, {}] inode={} fh={} offset={} size={} write_flags={} flags={}", + req.unique, + req.pid, + req.uid, + req.gid, + inode, + fh, + offset, + data.len(), + write_flags, + flags + ); + + let result = self + .handle + .write(req.clone(), inode, fh, offset, data, write_flags, flags) + .await; + + match &result { + Ok(reply) => { + debug!( + "WRITE reply: req_id={} [written_bytes={}]", + req.unique, reply.written, + ); + } + Err(e) => { + error!("WRITE failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn statfs(&self, req: Request, inode: Inode) -> fuse3::Result { + debug!( + "STATFS req_id={} pid={} [{}, {}] inode={}", + req.unique, req.pid, req.uid, req.gid, inode + ); + + let result = self.handle.statfs(req.clone(), inode).await; + + match &result { + Ok(reply) => { + debug!( + "STATFS reply: req_id={} [blocks={} bfree={} bavail={} files={} ffree={}]", + req.unique, reply.blocks, reply.bfree, reply.bavail, reply.files, reply.ffree + ); + } + Err(e) => { + error!("STATFS failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn release( + &self, + req: Request, + inode: Inode, + fh: u64, + flags: u32, + lock_owner: u64, + flush: bool, + ) -> fuse3::Result<()> { + debug!( + "RELEASE req_id={} pid={} [{}, {}] inode={} fh={} flags={} lock_owner={} flush={}", + req.unique, req.pid, req.uid, req.gid, inode, fh, flags, lock_owner, flush + ); + + let result = self + .handle + .release(req.clone(), inode, fh, flags, lock_owner, flush) + .await; + + match &result { + Ok(_) => { + debug!("RELEASE reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("RELEASE failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn opendir(&self, req: Request, inode: Inode, flags: u32) -> fuse3::Result { + debug!( + "OPENDIR req_id={} pid={} [{}, {}] inode={} flags={}", + req.unique, req.pid, req.uid, req.gid, inode, flags + ); + + let result = self.handle.opendir(req.clone(), inode, flags).await; + + match &result { + Ok(reply) => { + debug!( + "OPENDIR reply: req_id={} [fh={} flags={}]", + req.unique, reply.fh, reply.flags + ); + } + Err(e) => { + error!("OPENDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + type DirEntryStream<'a> + = BoxStream<'a, fuse3::Result> + where + T: 'a; + + async fn readdir<'a>( + &'a self, + req: Request, + parent: Inode, + fh: u64, + offset: i64, + ) -> fuse3::Result>> { + debug!( + "READDIR req_id={} pid={} [{}, {}] parent={} fh={} offset={}", + req.unique, req.pid, req.uid, req.gid, parent, fh, offset + ); + + let result = self.handle.readdir(req.clone(), parent, fh, offset).await; + + match &result { + Ok(_) => { + debug!("READDIR reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("READDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn releasedir( + &self, + req: Request, + inode: Inode, + fh: u64, + flags: u32, + ) -> fuse3::Result<()> { + debug!( + "RELEASEDIR req_id={} pid={} [{}, {}] inode={} fh={} flags={}", + req.unique, req.pid, req.uid, req.gid, inode, fh, flags + ); + + let result = self.handle.releasedir(req.clone(), inode, fh, flags).await; + + match &result { + Ok(_) => { + debug!("RELEASEDIR reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("RELEASEDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn create( + &self, + req: Request, + parent: Inode, + name: &OsStr, + mode: u32, + flags: u32, + ) -> fuse3::Result { + debug!( + "CREATE req_id={} pid={} [{}, {}] parent={}({}) name={} mode={} flags={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy(), + mode, + flags + ); + + let result = self + .handle + .create(req.clone(), parent, name, mode, flags) + .await; + + match &result { + Ok(reply) => { + debug!( + "CREATE reply: req_id={} [ino={} gen={} fh={} flags={}]", + req.unique, reply.attr.ino, reply.generation, reply.fh, reply.flags + ); + } + Err(e) => { + error!("CREATE failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + type DirEntryPlusStream<'a> + = BoxStream<'a, fuse3::Result> + where + T: 'a; + + async fn readdirplus<'a>( + &'a self, + req: Request, + parent: Inode, + fh: u64, + offset: u64, + lock_owner: u64, + ) -> fuse3::Result>> { + debug!( + "READDIRPLUS req_id={} pid={} [{}, {}] parent={} fh={} offset={} lock_owner={}", + req.unique, req.pid, req.uid, req.gid, parent, fh, offset, lock_owner + ); + + let result = self + .handle + .readdirplus(req.clone(), parent, fh, offset, lock_owner) + .await; + + match &result { + Ok(_) => { + debug!("READDIRPLUS reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("READDIRPLUS failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } +} diff --git a/clients/filesystem-fuse/src/main.rs b/clients/filesystem-fuse/src/main.rs index 48b6ab5517e..8a302dfcb86 100644 --- a/clients/filesystem-fuse/src/main.rs +++ b/clients/filesystem-fuse/src/main.rs @@ -16,15 +16,43 @@ * specific language governing permissions and limitations * under the License. */ - -use log::debug; -use log::info; -use std::process::exit; +mod cloud_storage_filesystem; +mod config; +mod error; +mod filesystem; +mod filesystem_metadata; +mod fuse_api_handle; +mod fuse_server; +mod gravitino_client; +mod gravitino_compose_filesystem; +mod gravitino_filesystem; +mod log_fuse_api_handle; +mod memory_filesystem; +mod opened_file_manager; +mod storage_filesystem; +mod utils; +use crate::config::Config; +use crate::fuse_server::FuseServer; +use fuse3::Result; +use log::{debug, info}; +use std::sync::Arc; #[tokio::main] -async fn main() { - tracing_subscriber::fmt().with_env_filter("debug").init(); - info!("Starting filesystem..."); - debug!("Shutdown filesystem..."); - exit(0); +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter("info,gvfs_fuse=debug,fuse3=warn") + .init(); + + debug!("Starting gvfs-fuse server..."); + let server = Arc::new(FuseServer::new("gvfs")); + let clone_server = server.clone(); + + let config = Config::default(); + let v = tokio::spawn(async move { clone_server.start(&config).await }); + + tokio::signal::ctrl_c().await?; + info!("Received Ctrl+C, stopping server..."); + server.stop().await?; + v.await.ok(); + Ok(()) } diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs b/clients/filesystem-fuse/src/memory_filesystem.rs new file mode 100644 index 00000000000..3e4edccc0df --- /dev/null +++ b/clients/filesystem-fuse/src/memory_filesystem.rs @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::{ + FileReader, FileStat, FileSystemCapacity, FileWriter, OpenFileFlags, OpenedFile, + PathFileSystem, Result, +}; +use crate::filesystem_metadata::DefaultFileSystemMetadata; +use crate::utils::join_file_path; +use async_trait::async_trait; +use bytes::Bytes; +use dashmap::DashMap; +use fuse3::FileType::{Directory, RegularFile}; +use fuse3::{Errno, FileType}; +use regex::Regex; +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex, RwLock}; + +// MemoryFileSystem is a simple in-memory filesystem implementation +// It is used for testing purposes + +struct MemoryFile { + kind: FileType, + data: Arc>>, +} + +pub(crate) struct MemoryFileSystem { + // file_map is a map of file name to file size + file_map: RwLock>, +} + +impl MemoryFileSystem { + pub fn new() -> Self { + Self { + file_map: RwLock::new(Default::default()), + } + } + + fn create_file_stat(&self, path: &str, file: &MemoryFile) -> FileStat { + match file.kind { + Directory => FileStat::new_dir_with_path(path), + _ => FileStat::new_file_with_path(path, file.data.lock().unwrap().len() as u64), + } + } +} + +#[async_trait] +impl PathFileSystem for MemoryFileSystem { + async fn init(&self) { + let root = MemoryFile { + kind: Directory, + data: Arc::new(Mutex::new(Vec::new())), + }; + self.file_map.write().unwrap().insert("".to_string(), root); + + let meta = MemoryFile { + kind: RegularFile, + data: Arc::new(Mutex::new(Vec::new())), + }; + self.file_map.write().unwrap().insert( + DefaultFileSystemMetadata::FS_META_FILE_NAME.to_string(), + meta, + ); + } + + async fn stat(&self, name: &str) -> Result { + self.file_map + .read() + .unwrap() + .get(name) + .map(|x| self.create_file_stat(name, x)) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + self.stat(&join_file_path(parent, name)).await + } + + async fn read_dir(&self, name: &str) -> Result> { + let file_map = self.file_map.read().unwrap(); + + let results: Vec = file_map + .iter() + .filter(|x| dir_child_reg_expr(name).is_match(x.0)) + .map(|(k, v)| self.create_file_stat(k, v)) + .collect(); + + Ok(results) + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + let file_stat = self.stat(name).await?; + let mut file = OpenedFile::new(file_stat.clone()); + match file.file_stat.kind { + Directory => Ok(file), + RegularFile => { + let data = self + .file_map + .read() + .unwrap() + .get(&file.file_stat.path) + .unwrap() + .data + .clone(); + file.reader = Some(Box::new(MemoryFileReader { data: data.clone() })); + file.writer = Some(Box::new(MemoryFileWriter { data: data })); + Ok(file) + } + _ => Err(Errno::from(libc::EBADF)), + } + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + self.open_file(name, flags).await + } + + async fn create_file( + &self, + parent: &str, + name: &str, + flags: OpenFileFlags, + ) -> Result { + { + let file_map = self.file_map.read().unwrap(); + if file_map.contains_key(&join_file_path(parent, name)) { + return Err(Errno::from(libc::EEXIST)); + } + }; + + let mut file = OpenedFile::new(FileStat::new_file(parent, name, 0)); + + let data = Arc::new(Mutex::new(Vec::new())); + self.file_map.write().unwrap().insert( + file.file_stat.path.clone(), + MemoryFile { + kind: RegularFile, + data: data.clone(), + }, + ); + file.reader = Some(Box::new(MemoryFileReader { data: data.clone() })); + file.writer = Some(Box::new(MemoryFileWriter { data: data })); + + Ok(file) + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + { + let mut file_map = self.file_map.read().unwrap(); + if file_map.contains_key(&join_file_path(parent, name)) { + return Err(Errno::from(libc::EEXIST)); + } + } + + let file = OpenedFile::new(FileStat::new_dir(parent, name)); + self.file_map.write().unwrap().insert( + file.file_stat.path.clone(), + MemoryFile { + kind: Directory, + data: Arc::new(Mutex::new(Vec::new())), + }, + ); + + Ok(file) + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + Ok(()) + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + let mut file_map = self.file_map.write().unwrap(); + if file_map.remove(&join_file_path(parent, name)).is_none() { + return Err(Errno::from(libc::ENOENT)); + } + Ok(()) + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + let mut file_map = self.file_map.write().unwrap(); + let count = file_map + .iter() + .filter(|x| dir_child_reg_expr(name).is_match(x.0)) + .count(); + + if count != 0 { + return Err(Errno::from(libc::ENOTEMPTY)); + } + + file_map.remove(&join_file_path(parent, name)); + Ok(()) + } + + fn get_capacity(&self) -> Result { + Ok(FileSystemCapacity {}) + } +} + +pub(crate) struct MemoryFileReader { + pub(crate) data: Arc>>, +} + +#[async_trait] +impl FileReader for MemoryFileReader { + async fn read(&mut self, offset: u64, size: u32) -> Result { + let v = self.data.lock().unwrap(); + let start = offset as usize; + let end = usize::min(start + size as usize, v.len()); + if start >= v.len() { + return Ok(Bytes::default()); + } + Ok(v[start..end].to_vec().into()) + } +} + +pub(crate) struct MemoryFileWriter { + pub(crate) data: Arc>>, +} + +#[async_trait] +impl FileWriter for MemoryFileWriter { + async fn write(&mut self, offset: u64, data: &[u8]) -> Result { + let mut v = self.data.lock().unwrap(); + let start = offset as usize; + let end = start + data.len(); + + if v.len() < end { + v.resize(end, 0); + } + v[start..end].copy_from_slice(data); + Ok(data.len() as u32) + } +} + +fn dir_child_reg_expr(name: &str) -> Regex { + let regex_pattern = if name.is_empty() { + r"^[^/]+$".to_string() + } else { + format!(r"^{}/[^/]+$", name) + }; + Regex::new(®ex_pattern).unwrap() +} diff --git a/clients/filesystem-fuse/src/opened_file_manager.rs b/clients/filesystem-fuse/src/opened_file_manager.rs new file mode 100644 index 00000000000..0fc635561fa --- /dev/null +++ b/clients/filesystem-fuse/src/opened_file_manager.rs @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::OpenedFile; +use dashmap::DashMap; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use tokio::sync::Mutex; + +// OpenedFileManager is a manager for opened files. +pub(crate) struct OpenedFileManager { + // file_handle_map is a map of file_handle_id to opned file. + file_handle_map: DashMap>>, + + // file_handle_id_generator is used to generate unique file handle IDs. + handle_id_generator: AtomicU64, +} + +impl OpenedFileManager { + pub fn new() -> Self { + Self { + file_handle_map: Default::default(), + handle_id_generator: AtomicU64::new(1), + } + } + + pub(crate) fn next_handle_id(&self) -> u64 { + self.handle_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + pub(crate) fn put_file(&self, mut file: OpenedFile) -> Arc> { + let file_handle_id = self.next_handle_id(); + file.handle_id = file_handle_id; + let file_handle = Arc::new(Mutex::new(file)); + self.file_handle_map + .insert(file_handle_id, file_handle.clone()); + file_handle + } + + pub(crate) fn get_file(&self, handle_id: u64) -> Option>> { + self.file_handle_map + .get(&handle_id) + .map(|x| x.value().clone()) + } + + pub(crate) fn remove_file(&self, handle_id: u64) -> Option>> { + self.file_handle_map.remove(&handle_id).map(|x| x.1) + } +} diff --git a/clients/filesystem-fuse/src/storage_filesystem.rs b/clients/filesystem-fuse/src/storage_filesystem.rs new file mode 100644 index 00000000000..1d57bd4c72b --- /dev/null +++ b/clients/filesystem-fuse/src/storage_filesystem.rs @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::cloud_storage_filesystem::CloudStorageFileSystem; +use crate::config::Config; +use crate::error::{ErrorCode, GravitinoError}; +use crate::filesystem::{FileStat, FileSystemCapacity, OpenFileFlags, OpenedFile, Result}; +use crate::filesystem::{FileSystemContext, PathFileSystem}; +use crate::memory_filesystem::MemoryFileSystem; +use crate::utils::{GravitinoResult, StorageFileSystemType}; +use async_trait::async_trait; +use opendal::layers::LoggingLayer; +use opendal::{services, Builder, Operator}; + +pub(crate) enum StorageFileSystem { + MemoryStorage(MemoryFileSystem), + CloudStorage(CloudStorageFileSystem), +} + +impl StorageFileSystem { + pub(crate) async fn new( + fs_type: &StorageFileSystemType, + config: &Config, + context: &FileSystemContext, + ) -> GravitinoResult { + match fs_type { + StorageFileSystemType::S3 => { + let builder = services::S3::from_map(config.extent_config.clone()); + let op = Operator::new(builder) + .expect("opendal create failed") + .layer(LoggingLayer::default()) + .finish(); + + let fs = CloudStorageFileSystem::new(op); + Ok(StorageFileSystem::CloudStorage(fs)) + } + _ => Err(ErrorCode::UnSupportedFilesystem + .to_error(format!("Unsupported filesystem type: {}", fs_type))), + } + } +} + +macro_rules! async_call_fun { + ($self:expr, $fun:ident $(, $args:expr)* ) => { + match $self { + StorageFileSystem::MemoryStorage(fs) => fs.$fun($($args),*).await, + StorageFileSystem::CloudStorage(fs) => fs.$fun($($args),*).await, + } + }; +} + +macro_rules! call_fun { + ($self:expr, $fun:ident $(, $args:expr)* ) => { + match $self { + StorageFileSystem::MemoryStorage(fs) => fs.$fun($($args),*), + StorageFileSystem::CloudStorage(fs) => fs.$fun($($args),*), + } + }; +} + +#[async_trait] +impl PathFileSystem for StorageFileSystem { + async fn init(&self) { + async_call_fun!(self, init) + } + + async fn stat(&self, name: &str) -> Result { + async_call_fun!(self, stat, name) + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + async_call_fun!(self, lookup, parent, name) + } + + async fn read_dir(&self, name: &str) -> Result> { + async_call_fun!(self, read_dir, name) + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + async_call_fun!(self, open_file, name, flags) + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + async_call_fun!(self, open_dir, name, flags) + } + + async fn create_file( + &self, + parent: &str, + name: &str, + flags: OpenFileFlags, + ) -> Result { + async_call_fun!(self, create_file, parent, name, flags) + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + async_call_fun!(self, create_dir, parent, name) + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + async_call_fun!(self, set_attr, name, file_stat, flush) + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + async_call_fun!(self, remove_file, parent, name) + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + async_call_fun!(self, remove_dir, parent, name) + } + + fn get_capacity(&self) -> Result { + call_fun!(self, get_capacity) + } +} diff --git a/clients/filesystem-fuse/src/utils.rs b/clients/filesystem-fuse/src/utils.rs new file mode 100644 index 00000000000..295c563fcff --- /dev/null +++ b/clients/filesystem-fuse/src/utils.rs @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::error::GravitinoError; +use fuse3::Timestamp; +use std::fmt; +use std::time::SystemTime; + +pub type GravitinoResult = Result; + +pub enum StorageFileSystemType { + S3, +} + +impl fmt::Display for StorageFileSystemType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + StorageFileSystemType::S3 => write!(f, "s3"), + } + } +} + +pub fn join_file_path(parent: &str, name: &str) -> String { + if parent.is_empty() { + name.to_string() + } else { + format!("{}/{}", parent, name) + } +} + +pub fn timestamp_diff_from_now(timestamp: Timestamp) -> i64 { + let now = Timestamp::from(SystemTime::now()); + timestamp.sec - now.sec +} + +pub fn extract_fileset(path: &str) -> Option<(String, String, String)> { + let prefix = "gvfs://fileset/"; + if !path.starts_with(prefix) { + return None; + } + + let path_without_prefix = &path[prefix.len()..]; + + let parts: Vec<&str> = path_without_prefix.split('/').collect(); + + if parts.len() < 3 { + return None; + } + + let catalog = parts[0].to_string(); + let schema = parts[1].to_string(); + let fileset = parts[2].to_string(); + + Some((catalog, schema, fileset)) +} + +pub fn extract_storage_filesystem(path: &str) -> Option<(StorageFileSystemType, String)> { + if let Some(pos) = path.find("://") { + let protocol = &path[..pos]; + let location = &path[pos + 3..]; + match protocol { + "s3" => Some((StorageFileSystemType::S3, location.to_string())), + _ => None, + } + } else { + None + } +} diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs new file mode 100644 index 00000000000..577f442d5ff --- /dev/null +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use gvfs_fuse::fuse_server::FuseServer; +use log::info; +use std::fs; +use std::fs::File; +use std::io::Write; +use std::os::unix::fs::MetadataExt; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::thread::sleep; +use std::time::{Duration, Instant}; +use tokio::runtime::Runtime; + +struct FuseTest { + server: Arc, + runtime: Arc, + mount_point: String, +} + +impl FuseTest { + pub fn setup(&self) { + info!("Start gvfs fuse server"); + let svr = self.server.clone(); + self.runtime.spawn(async move { svr.start().await }); + + let success = Self::wait_for_fuse_server_ready(&self.mount_point, Duration::from_secs(15)); + assert!(success, "Fuse server cannot start up at 15 seconds"); + } + + pub fn shutdown(&self) { + self.runtime.block_on(async { + let _ = self.server.stop().await; + }); + } + + fn wait_for_fuse_server_ready(path: &str, timeout: Duration) -> bool { + let test_file = format!("{}/.gvfs_meta", path); + let start_time = Instant::now(); + + while start_time.elapsed() < timeout { + if let Ok(exists) = fs::exists(&test_file) { + info!("Wait for fuse server ready: {}", exists); + if exists { + return true; + } + } + sleep(Duration::from_secs(1)); + } + false + } +} + +impl Drop for FuseTest { + fn drop(&mut self) { + info!("Shutdown fuse server"); + self.shutdown(); + } +} + +#[test] +fn test_fuse_system_with_auto() { + tracing_subscriber::fmt() + .with_env_filter("gvfs_fuse=debug,info") + .init(); + + let mount_point = "build/gvfs"; + let _ = fs::create_dir_all(mount_point); + + let test = FuseTest { + server: Arc::new(FuseServer::new(mount_point)), + runtime: Arc::new(Runtime::new().expect("")), + mount_point: mount_point.to_string(), + }; + test.setup(); + + test_fuse_filesystem(mount_point); +} + +fn test_fuse_system_with_manual() { + test_fuse_filesystem("build/gvfs"); +} + +fn test_fuse_filesystem(mount_point: &str) { + info!("Test startup"); + let base_path = Path::new(mount_point); + + //test create file + let test_file = base_path.join("test_create"); + let file = File::create(&test_file).expect("Failed to create file"); + assert!(file.metadata().is_ok(), "Failed to get file metadata"); + assert!(fs::exists(&test_file).expect("File is not created")); + + //test write file + fs::write(&test_file, "read test").expect("Failed to write file"); + + //test read file + let content = fs::read_to_string(test_file.clone()).expect("Failed to read file"); + assert_eq!(content, "read test", "File content mismatch"); + + //test delete file + fs::remove_file(test_file.clone()).expect("Failed to delete file"); + assert!(!fs::exists(test_file).expect("File is not deleted")); + + //test create directory + let test_dir = base_path.join("test_dir"); + fs::create_dir(&test_dir).expect("Failed to create directory"); + + //test create file in directory + let test_file = base_path.join("test_dir/test_file"); + let file = File::create(&test_file).expect("Failed to create file"); + assert!(file.metadata().is_ok(), "Failed to get file metadata"); + + //test write file in directory + let test_file = base_path.join("test_dir/test_read"); + fs::write(&test_file, "read test").expect("Failed to write file"); + + //test read file in directory + let content = fs::read_to_string(&test_file).expect("Failed to read file"); + assert_eq!(content, "read test", "File content mismatch"); + + //test delete file in directory + fs::remove_file(&test_file).expect("Failed to delete file"); + assert!(!fs::exists(&test_file).expect("File is not deleted")); + + //test delete directory + fs::remove_dir_all(&test_dir).expect("Failed to delete directory"); + assert!(!fs::exists(&test_dir).expect("Directory is not deleted")); + + info!("Success test") +} diff --git a/clients/filesystem-fuse/tools/create_fileset.py b/clients/filesystem-fuse/tools/create_fileset.py new file mode 100644 index 00000000000..139597f9cb0 --- /dev/null +++ b/clients/filesystem-fuse/tools/create_fileset.py @@ -0,0 +1,2 @@ + + diff --git a/clients/filesystem-fuse/tools/run.py b/clients/filesystem-fuse/tools/run.py new file mode 100644 index 00000000000..8f7fe6a763c --- /dev/null +++ b/clients/filesystem-fuse/tools/run.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +with open('gvfs/a5.txt', 'w') as file: + file.write('Hello, world!\n') + +print("File written successfully!") +