From cefe316bbfd0b22faa46c087100b378a37023b42 Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Thu, 24 Oct 2024 11:25:49 +0800 Subject: [PATCH] [#5188] feat(python-client): Support s3 fileset in python client (#5209) ### What changes were proposed in this pull request? Add support for S3 fileset in the Python client. ### Why are the changes needed? it's the user needs. Fix: #5188 ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? Replace with real s3 account and execute the following test. image ./gradlew :clients:client-python:test -PskipDockerTests=false --- .../gravitino/filesystem/gvfs.py | 114 +++++-- .../gravitino/filesystem/gvfs_config.py | 8 +- clients/client-python/requirements.txt | 3 +- .../tests/integration/test_gvfs_with_gcs.py | 150 +++++++-- .../tests/integration/test_gvfs_with_hdfs.py | 95 +++--- .../tests/integration/test_gvfs_with_s3.py | 299 ++++++++++++++++++ 6 files changed, 565 insertions(+), 104 deletions(-) create mode 100644 clients/client-python/tests/integration/test_gvfs_with_s3.py diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py index 8f1b2008ab9..a9201a83326 100644 --- a/clients/client-python/gravitino/filesystem/gvfs.py +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -14,7 +14,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import os from enum import Enum from pathlib import PurePosixPath from typing import Dict, Tuple @@ -49,6 +48,7 @@ class StorageType(Enum): HDFS = "hdfs" LOCAL = "file" GCS = "gs" + S3A = "s3a" class FilesetContextPair: @@ -314,7 +314,11 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): # convert the following to in - if storage_type in [StorageType.HDFS, StorageType.GCS]: + if storage_type in [ + StorageType.HDFS, + StorageType.GCS, + StorageType.S3A, + ]: src_context_pair.filesystem().mv( self._strip_storage_protocol(storage_type, src_actual_path), self._strip_storage_protocol(storage_type, dst_actual_path), @@ -336,6 +340,10 @@ def _rm(self, path): "Deprecated method, use `rm_file` method instead." ) + def lazy_load_class(self, module_name, class_name): + module = importlib.import_module(module_name) + return getattr(module, class_name) + def rm(self, path, recursive=False, maxdepth=None): """Remove a file or directory. :param path: Virtual fileset path @@ -348,11 +356,17 @@ def rm(self, path, recursive=False, maxdepth=None): ) actual_path = context_pair.actual_file_location() storage_type = self._recognize_storage_type(actual_path) - context_pair.filesystem().rm( - self._strip_storage_protocol(storage_type, actual_path), - recursive, - maxdepth, - ) + fs = context_pair.filesystem() + + # S3FileSystem doesn't support maxdepth + if isinstance(fs, self.lazy_load_class("s3fs", "S3FileSystem")): + fs.rm(self._strip_storage_protocol(storage_type, actual_path), recursive) + else: + fs.rm( + self._strip_storage_protocol(storage_type, actual_path), + recursive, + maxdepth, + ) def rm_file(self, path): """Remove a file. @@ -547,9 +561,11 @@ def _convert_actual_path( """ # If the storage path starts with hdfs, gcs, we should use the path as the prefix. - if storage_location.startswith( - f"{StorageType.HDFS.value}://" - ) or storage_location.startswith(f"{StorageType.GCS.value}://"): + if ( + storage_location.startswith(f"{StorageType.HDFS.value}://") + or storage_location.startswith(f"{StorageType.GCS.value}://") + or storage_location.startswith(f"{StorageType.S3A.value}://") + ): actual_prefix = infer_storage_options(storage_location)["path"] elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :] @@ -586,11 +602,34 @@ def _convert_actual_info( path = self._convert_actual_path( entry["name"], storage_location, virtual_location ) + + # if entry contains 'mtime', then return the entry with 'mtime' else + # if entry contains 'LastModified', then return the entry with 'LastModified' + + if "mtime" in entry: + # HDFS and GCS + return { + "name": path, + "size": entry["size"], + "type": entry["type"], + "mtime": entry["mtime"], + } + + if "LastModified" in entry: + # S3 and OSS + return { + "name": path, + "size": entry["size"], + "type": entry["type"], + "mtime": entry["LastModified"], + } + + # Unknown return { "name": path, "size": entry["size"], "type": entry["type"], - "mtime": entry["mtime"], + "mtime": None, } def _get_fileset_context(self, virtual_path: str, operation: FilesetDataOperation): @@ -692,6 +731,8 @@ def _recognize_storage_type(path: str): return StorageType.LOCAL if path.startswith(f"{StorageType.GCS.value}://"): return StorageType.GCS + if path.startswith(f"{StorageType.S3A.value}://"): + return StorageType.S3A raise GravitinoRuntimeException( f"Storage type doesn't support now. Path:{path}" ) @@ -716,7 +757,7 @@ def _strip_storage_protocol(storage_type: StorageType, path: str): :param path: The path :return: The stripped path """ - if storage_type in (StorageType.HDFS, StorageType.GCS): + if storage_type in (StorageType.HDFS, StorageType.GCS, StorageType.S3A): return path if storage_type == StorageType.LOCAL: return path[len(f"{StorageType.LOCAL.value}:") :] @@ -791,7 +832,9 @@ def _get_filesystem(self, actual_file_location: str): elif storage_type == StorageType.LOCAL: fs = LocalFileSystem() elif storage_type == StorageType.GCS: - fs = ArrowFSWrapper(self._get_gcs_filesystem()) + fs = self._get_gcs_filesystem() + elif storage_type == StorageType.S3A: + fs = self._get_s3_filesystem() else: raise GravitinoRuntimeException( f"Storage type: `{storage_type}` doesn't support now." @@ -802,22 +845,47 @@ def _get_filesystem(self, actual_file_location: str): write_lock.release() def _get_gcs_filesystem(self): - # get All keys from the options that start with 'gravitino.bypass.gcs.' and remove the prefix - gcs_options = { - key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) :]: value - for key, value in self._options.items() - if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) - } - # get 'service-account-key' from gcs_options, if the key is not found, throw an exception - service_account_key_path = gcs_options.get(GVFSConfig.GVFS_FILESYSTEM_KEY_FILE) + service_account_key_path = self._options.get( + GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE + ) if service_account_key_path is None: raise GravitinoRuntimeException( "Service account key is not found in the options." ) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path + return importlib.import_module("gcsfs").GCSFileSystem( + token=service_account_key_path + ) - return importlib.import_module("pyarrow.fs").GcsFileSystem() + def _get_s3_filesystem(self): + # get 'aws_access_key_id' from s3_options, if the key is not found, throw an exception + aws_access_key_id = self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY) + if aws_access_key_id is None: + raise GravitinoRuntimeException( + "AWS access key id is not found in the options." + ) + + # get 'aws_secret_access_key' from s3_options, if the key is not found, throw an exception + aws_secret_access_key = self._options.get( + GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY + ) + if aws_secret_access_key is None: + raise GravitinoRuntimeException( + "AWS secret access key is not found in the options." + ) + + # get 'aws_endpoint_url' from s3_options, if the key is not found, throw an exception + aws_endpoint_url = self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT) + if aws_endpoint_url is None: + raise GravitinoRuntimeException( + "AWS endpoint url is not found in the options." + ) + + return importlib.import_module("s3fs").S3FileSystem( + key=aws_access_key_id, + secret=aws_secret_access_key, + endpoint_url=aws_endpoint_url, + ) fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py b/clients/client-python/gravitino/filesystem/gvfs_config.py index 618565c70eb..7ffacdb095d 100644 --- a/clients/client-python/gravitino/filesystem/gvfs_config.py +++ b/clients/client-python/gravitino/filesystem/gvfs_config.py @@ -32,6 +32,8 @@ class GVFSConfig: OAUTH2_PATH = "oauth2_path" OAUTH2_SCOPE = "oauth2_scope" - GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass" - GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs." - GVFS_FILESYSTEM_KEY_FILE = "service-account-key-path" + GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE = "gcs_service_account_key_path" + + GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key" + GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_key" + GVFS_FILESYSTEM_S3_ENDPOINT = "s3_endpoint" diff --git a/clients/client-python/requirements.txt b/clients/client-python/requirements.txt index a330f738a1f..1d0f4fadd5d 100644 --- a/clients/client-python/requirements.txt +++ b/clients/client-python/requirements.txt @@ -23,4 +23,5 @@ readerwriterlock==1.0.9 fsspec==2024.3.1 pyarrow==15.0.2 cachetools==5.3.3 -google-auth==2.35.0 \ No newline at end of file +gcsfs==2024.3.1 +s3fs==2024.3.1 \ No newline at end of file diff --git a/clients/client-python/tests/integration/test_gvfs_with_gcs.py b/clients/client-python/tests/integration/test_gvfs_with_gcs.py index 16f84dff3b1..54a2cfd07e5 100644 --- a/clients/client-python/tests/integration/test_gvfs_with_gcs.py +++ b/clients/client-python/tests/integration/test_gvfs_with_gcs.py @@ -20,8 +20,8 @@ from random import randint import unittest -from fsspec.implementations.arrow import ArrowFSWrapper -from pyarrow.fs import GcsFileSystem +from gcsfs import GCSFileSystem + from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS from gravitino import ( @@ -31,7 +31,7 @@ Fileset, ) from gravitino.exceptions.base import GravitinoRuntimeException - +from gravitino.filesystem.gvfs_config import GVFSConfig logger = logging.getLogger(__name__) @@ -45,7 +45,9 @@ class TestGvfsWithGCS(TestGvfsWithHDFS): metalake_name: str = "TestGvfsWithGCS_metalake" + str(randint(1, 10000)) def setUp(self): - self.options = {"gravitino.bypass.gcs.service-account-key-path": self.key_file} + self.options = { + f"{GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE}": self.key_file + } def tearDown(self): self.options = {} @@ -129,9 +131,22 @@ def _init_test_entities(cls): properties=cls.fileset_properties, ) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cls.key_file - arrow_gcs_fs = GcsFileSystem() - cls.fs = ArrowFSWrapper(arrow_gcs_fs) + cls.fs = GCSFileSystem(token=cls.key_file) + + # Object storage like GCS does not support making directory and can only create + # objects under the bucket. So we need to skip the test for GCS. + def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance): + # GCS will not create a directory, so the directory will not exist. + self.fs.mkdir(actual_dir) + self.assertFalse(self.fs.exists(actual_dir)) + self.assertFalse(gvfs_instance.exists(gvfs_dir)) + + # Object storage like GCS does not support making directory and can only create + # objects under the bucket. So we need to skip the test for GCS. + def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance): + self.fs.makedirs(actual_dir) + self.assertFalse(self.fs.exists(actual_dir)) + self.assertFalse(gvfs_instance.exists(gvfs_dir)) def test_modified(self): modified_dir = self.fileset_gvfs_location + "/test_modified" @@ -142,17 +157,15 @@ def test_modified(self): options=self.options, **self.conf, ) - self.fs.mkdir(modified_actual_dir) - self.assertTrue(self.fs.exists(modified_actual_dir)) - self.assertTrue(fs.exists(modified_dir)) + self.check_mkdir(modified_dir, modified_actual_dir, fs) # GCP only supports getting the `object` modify time, so the modified time will be None # if it's a directory. # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3') # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3') # >>> print(r) # None - self.assertIsNone(fs.modified(modified_dir)) + # self.assertIsNone(fs.modified(modified_dir)) # create a file under the dir 'modified_dir'. file_path = modified_dir + "/test.txt" @@ -160,14 +173,107 @@ def test_modified(self): self.assertTrue(fs.exists(file_path)) self.assertIsNotNone(fs.modified(file_path)) - @unittest.skip( - "This test will fail for https://github.com/apache/arrow/issues/44438" - ) - def test_pandas(self): - pass - - @unittest.skip( - "This test will fail for https://github.com/apache/arrow/issues/44438" - ) - def test_pyarrow(self): - pass + def test_rm(self): + rm_dir = self.fileset_gvfs_location + "/test_rm" + rm_actual_dir = self.fileset_storage_location + "/test_rm" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rm_dir, rm_actual_dir, fs) + + rm_file = self.fileset_gvfs_location + "/test_rm/test.file" + rm_actual_file = self.fileset_storage_location + "/test_rm/test.file" + fs.touch(rm_file) + self.assertTrue(self.fs.exists(rm_actual_file)) + self.assertTrue(fs.exists(rm_file)) + + # test delete file + fs.rm(rm_file) + self.assertFalse(fs.exists(rm_file)) + + # test delete dir with recursive = false + rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file" + rm_new_actual_file = self.fileset_storage_location + "/test_rm/test_new.file" + self.fs.touch(rm_new_actual_file) + self.assertTrue(self.fs.exists(rm_new_actual_file)) + self.assertTrue(fs.exists(rm_new_file)) + # fs.rm(rm_dir) + + # fs.rm(rm_dir, recursive=False) will delete the directory and the file + # directly under the directory, so we comment the following code. + # test delete dir with recursive = true + # fs.rm(rm_dir, recursive=True) + # self.assertFalse(fs.exists(rm_dir)) + + def test_rmdir(self): + rmdir_dir = self.fileset_gvfs_location + "/test_rmdir" + rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs) + + rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file" + rmdir_actual_file = self.fileset_storage_location + "/test_rmdir/test.file" + self.fs.touch(rmdir_actual_file) + self.assertTrue(self.fs.exists(rmdir_actual_file)) + self.assertTrue(fs.exists(rmdir_file)) + + # test delete file, GCS will remove the file directly. + fs.rmdir(rmdir_file) + + def test_mkdir(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.bucket_name + "1" + mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket) + mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket) + fs.mkdir(mkdir_dir, create_parents=True) + + self.assertFalse(self.fs.exists(mkdir_actual_dir)) + self.assertFalse(fs.exists(mkdir_dir)) + self.assertFalse(self.fs.exists("gs://" + new_bucket)) + + def test_makedirs(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.bucket_name + "1" + mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket) + mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket) + + # it takes no effect. + fs.makedirs(mkdir_dir) + + self.assertFalse(self.fs.exists(mkdir_actual_dir)) + self.assertFalse(fs.exists(mkdir_dir)) + self.assertFalse(self.fs.exists("gs://" + new_bucket)) diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py index 5be5914f15b..8b1c367bc5c 100644 --- a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py +++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py @@ -239,8 +239,8 @@ def test_ls(self): options=self.options, **self.conf, ) - self.fs.mkdir(ls_actual_dir) - self.assertTrue(self.fs.exists(ls_actual_dir)) + + self.check_mkdir(ls_dir, ls_actual_dir, fs) ls_file = self.fileset_gvfs_location + "/test_ls/test.file" ls_actual_file = self.fileset_storage_location + "/test_ls/test.file" @@ -266,8 +266,8 @@ def test_info(self): options=self.options, **self.conf, ) - self.fs.mkdir(info_actual_dir) - self.assertTrue(self.fs.exists(info_actual_dir)) + + self.check_mkdir(info_dir, info_actual_dir, fs) info_file = self.fileset_gvfs_location + "/test_info/test.file" info_actual_file = self.fileset_storage_location + "/test_info/test.file" @@ -289,9 +289,7 @@ def test_exist(self): options=self.options, **self.conf, ) - self.fs.mkdir(exist_actual_dir) - self.assertTrue(self.fs.exists(exist_actual_dir)) - self.assertTrue(fs.exists(exist_dir)) + self.check_mkdir(exist_dir, exist_actual_dir, fs) exist_file = self.fileset_gvfs_location + "/test_exist/test.file" exist_actual_file = self.fileset_storage_location + "/test_exist/test.file" @@ -308,9 +306,8 @@ def test_cp_file(self): options=self.options, **self.conf, ) - self.fs.mkdir(cp_file_actual_dir) - self.assertTrue(self.fs.exists(cp_file_actual_dir)) - self.assertTrue(fs.exists(cp_file_dir)) + + self.check_mkdir(cp_file_dir, cp_file_actual_dir, fs) cp_file_file = self.fileset_gvfs_location + "/test_cp_file/test.file" cp_file_actual_file = self.fileset_storage_location + "/test_cp_file/test.file" @@ -341,9 +338,7 @@ def test_mv(self): options=self.options, **self.conf, ) - self.fs.mkdir(mv_actual_dir) - self.assertTrue(self.fs.exists(mv_actual_dir)) - self.assertTrue(fs.exists(mv_dir)) + self.check_mkdir(mv_dir, mv_actual_dir, fs) mv_new_dir = self.fileset_gvfs_location + "/test_mv_new" mv_new_actual_dir = self.fileset_storage_location + "/test_mv_new" @@ -353,9 +348,8 @@ def test_mv(self): options=self.options, **self.conf, ) - self.fs.mkdir(mv_new_actual_dir) - self.assertTrue(self.fs.exists(mv_new_actual_dir)) - self.assertTrue(fs.exists(mv_new_dir)) + + self.check_mkdir(mv_new_dir, mv_new_actual_dir, fs) mv_file = self.fileset_gvfs_location + "/test_mv/test.file" mv_actual_file = self.fileset_storage_location + "/test_mv/test.file" @@ -385,9 +379,7 @@ def test_rm(self): options=self.options, **self.conf, ) - self.fs.mkdir(rm_actual_dir) - self.assertTrue(self.fs.exists(rm_actual_dir)) - self.assertTrue(fs.exists(rm_dir)) + self.check_mkdir(rm_dir, rm_actual_dir, fs) rm_file = self.fileset_gvfs_location + "/test_rm/test.file" rm_actual_file = self.fileset_storage_location + "/test_rm/test.file" @@ -421,9 +413,7 @@ def test_rm_file(self): options=self.options, **self.conf, ) - self.fs.mkdir(rm_file_actual_dir) - self.assertTrue(self.fs.exists(rm_file_actual_dir)) - self.assertTrue(fs.exists(rm_file_dir)) + self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs) rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file" rm_file_actual_file = self.fileset_storage_location + "/test_rm_file/test.file" @@ -448,9 +438,7 @@ def test_rmdir(self): options=self.options, **self.conf, ) - self.fs.mkdir(rmdir_actual_dir) - self.assertTrue(self.fs.exists(rmdir_actual_dir)) - self.assertTrue(fs.exists(rmdir_dir)) + self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs) rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file" rmdir_actual_file = self.fileset_storage_location + "/test_rmdir/test.file" @@ -475,9 +463,7 @@ def test_open(self): options=self.options, **self.conf, ) - self.fs.mkdir(open_actual_dir) - self.assertTrue(self.fs.exists(open_actual_dir)) - self.assertTrue(fs.exists(open_dir)) + self.check_mkdir(open_dir, open_actual_dir, fs) open_file = self.fileset_gvfs_location + "/test_open/test.file" open_actual_file = self.fileset_storage_location + "/test_open/test.file" @@ -503,9 +489,7 @@ def test_mkdir(self): options=self.options, **self.conf, ) - fs.mkdir(mkdir_dir) - self.assertTrue(fs.exists(mkdir_dir)) - self.assertTrue(self.fs.exists(mkdir_actual_dir)) + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) # test mkdir dir with create_parents = false parent_not_exist_virtual_path = mkdir_dir + "/not_exist/sub_dir" @@ -530,9 +514,7 @@ def test_makedirs(self): options=self.options, **self.conf, ) - fs.makedirs(makedirs_dir) - self.assertTrue(fs.exists(makedirs_dir)) - self.assertTrue(self.fs.exists(makedirs_actual_dir)) + self.check_makedirs(makedirs_dir, makedirs_actual_dir, fs) # test mkdir dir not exist parent_not_exist_virtual_path = makedirs_dir + "/not_exist/sub_dir" @@ -549,9 +531,8 @@ def test_created(self): options=self.options, **self.conf, ) - self.fs.mkdir(created_actual_dir) - self.assertTrue(self.fs.exists(created_actual_dir)) - self.assertTrue(fs.exists(created_dir)) + + self.check_mkdir(created_dir, created_actual_dir, fs) with self.assertRaises(GravitinoRuntimeException): fs.created(created_dir) @@ -565,13 +546,22 @@ def test_modified(self): options=self.options, **self.conf, ) - self.fs.mkdir(modified_actual_dir) - self.assertTrue(self.fs.exists(modified_actual_dir)) - self.assertTrue(fs.exists(modified_dir)) + + self.check_mkdir(modified_dir, modified_actual_dir, fs) # test mkdir dir which exists self.assertIsNotNone(fs.modified(modified_dir)) + def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance): + self.fs.mkdir(actual_dir) + self.assertTrue(self.fs.exists(actual_dir)) + self.assertTrue(gvfs_instance.exists(gvfs_dir)) + + def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance): + self.fs.makedirs(actual_dir) + self.assertTrue(self.fs.exists(actual_dir)) + self.assertTrue(gvfs_instance.exists(gvfs_dir)) + def test_cat_file(self): cat_dir = self.fileset_gvfs_location + "/test_cat" cat_actual_dir = self.fileset_storage_location + "/test_cat" @@ -581,9 +571,8 @@ def test_cat_file(self): options=self.options, **self.conf, ) - self.fs.mkdir(cat_actual_dir) - self.assertTrue(self.fs.exists(cat_actual_dir)) - self.assertTrue(fs.exists(cat_dir)) + + self.check_mkdir(cat_dir, cat_actual_dir, fs) cat_file = self.fileset_gvfs_location + "/test_cat/test.file" cat_actual_file = self.fileset_storage_location + "/test_cat/test.file" @@ -609,9 +598,8 @@ def test_get_file(self): options=self.options, **self.conf, ) - self.fs.mkdir(get_actual_dir) - self.assertTrue(self.fs.exists(get_actual_dir)) - self.assertTrue(fs.exists(get_dir)) + + self.check_mkdir(get_dir, get_actual_dir, fs) get_file = self.fileset_gvfs_location + "/test_get/test.file" get_actual_file = self.fileset_storage_location + "/test_get/test.file" @@ -649,9 +637,8 @@ def test_pandas(self): options=self.options, **self.conf, ) - self.fs.mkdir(pands_actual_dir) - self.assertTrue(self.fs.exists(pands_actual_dir)) - self.assertTrue(fs.exists(pands_dir)) + + self.check_mkdir(pands_dir, pands_actual_dir, fs) data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) # to parquet @@ -695,9 +682,8 @@ def test_pyarrow(self): options=self.options, **self.conf, ) - self.fs.mkdir(pyarrow_actual_dir) - self.assertTrue(self.fs.exists(pyarrow_actual_dir)) - self.assertTrue(fs.exists(pyarrow_dir)) + + self.check_mkdir(pyarrow_dir, pyarrow_actual_dir, fs) data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) # to parquet @@ -725,9 +711,8 @@ def test_llama_index(self): options=self.options, **self.conf, ) - self.fs.mkdir(llama_actual_dir) - self.assertTrue(self.fs.exists(llama_actual_dir)) - self.assertTrue(fs.exists(llama_dir)) + self.check_mkdir(llama_dir, llama_actual_dir, fs) + data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) storage_options = { diff --git a/clients/client-python/tests/integration/test_gvfs_with_s3.py b/clients/client-python/tests/integration/test_gvfs_with_s3.py new file mode 100644 index 00000000000..5758a7e6580 --- /dev/null +++ b/clients/client-python/tests/integration/test_gvfs_with_s3.py @@ -0,0 +1,299 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + +from s3fs import S3FileSystem + +from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS +from gravitino import ( + gvfs, + GravitinoClient, + Catalog, + Fileset, +) +from gravitino.exceptions.base import GravitinoRuntimeException +from gravitino.filesystem.gvfs_config import GVFSConfig + +logger = logging.getLogger(__name__) + + +@unittest.skip("This test require S3 service account") +class TestGvfsWithS3(TestGvfsWithHDFS): + # Before running this test, please set the make sure aws-bundle-x.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + s3_access_key = "your_access_key" + s3_secret_key = "your_secret_key" + s3_endpoint = "your_endpoint" + bucket_name = "your_bucket_name" + + metalake_name: str = "TestGvfsWithS3_metalake" + str(randint(1, 10000)) + + def setUp(self): + self.options = { + f"{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key, + f"{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key, + f"{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint, + } + + def tearDown(self): + self.options = {} + + @classmethod + def setUpClass(cls): + cls._get_gravitino_home() + + cls.hadoop_conf_path = f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf" + # restart the server + cls.restart_server() + # create entity + cls._init_test_entities() + + @classmethod + def tearDownClass(cls): + cls._clean_test_data() + # reset server conf in case of other ITs like HDFS has changed it and fail + # to reset it + cls._reset_conf(cls.config, cls.hadoop_conf_path) + # restart server + cls.restart_server() + + # clear all config in the conf_path + @classmethod + def _reset_conf(cls, config, conf_path): + logger.info("Reset %s.", conf_path) + if not os.path.exists(conf_path): + raise GravitinoRuntimeException(f"Conf file is not found at `{conf_path}`.") + filtered_lines = [] + with open(conf_path, mode="r", encoding="utf-8") as file: + origin_lines = file.readlines() + + for line in origin_lines: + line = line.strip() + if line.startswith("#"): + # append annotations directly + filtered_lines.append(line + "\n") + + with open(conf_path, mode="w", encoding="utf-8") as file: + for line in filtered_lines: + file.write(line) + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "s3", + "gravitino.bypass.fs.s3a.access.key": cls.s3_access_key, + "gravitino.bypass.fs.s3a.secret.key": cls.s3_secret_key, + "gravitino.bypass.fs.s3a.endpoint": cls.s3_endpoint, + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"s3a://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=cls.fileset_storage_location, + properties=cls.fileset_properties, + ) + + cls.fs = S3FileSystem( + key=cls.s3_access_key, + secret=cls.s3_secret_key, + endpoint_url=cls.s3_endpoint, + ) + + def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance): + # S3 will not create a directory, so the directory will not exist. + self.fs.mkdir(actual_dir) + self.assertFalse(self.fs.exists(actual_dir)) + self.assertFalse(gvfs_instance.exists(gvfs_dir)) + + def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance): + self.fs.makedirs(actual_dir) + self.assertFalse(self.fs.exists(actual_dir)) + self.assertFalse(gvfs_instance.exists(gvfs_dir)) + + def test_modified(self): + modified_dir = self.fileset_gvfs_location + "/test_modified" + modified_actual_dir = self.fileset_storage_location + "/test_modified" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(modified_dir, modified_actual_dir, fs) + # S3 only supports getting the `object` modify time, so the modified time will be None + # if it's a directory. + # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3') + # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3') + # >>> print(r) + # None + # self.assertIsNone(fs.modified(modified_dir)) + + # create a file under the dir 'modified_dir'. + file_path = modified_dir + "/test.txt" + fs.touch(file_path) + self.assertTrue(fs.exists(file_path)) + self.assertIsNotNone(fs.modified(file_path)) + + def test_rm(self): + rm_dir = self.fileset_gvfs_location + "/test_rm" + rm_actual_dir = self.fileset_storage_location + "/test_rm" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rm_dir, rm_actual_dir, fs) + + rm_file = self.fileset_gvfs_location + "/test_rm/test.file" + rm_actual_file = self.fileset_storage_location + "/test_rm/test.file" + fs.touch(rm_file) + self.assertTrue(self.fs.exists(rm_actual_file)) + self.assertTrue(fs.exists(rm_file)) + + # test delete file + fs.rm(rm_file) + self.assertFalse(fs.exists(rm_file)) + + # test delete dir with recursive = false + rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file" + rm_new_actual_file = self.fileset_storage_location + "/test_rm/test_new.file" + self.fs.touch(rm_new_actual_file) + self.assertTrue(self.fs.exists(rm_new_actual_file)) + self.assertTrue(fs.exists(rm_new_file)) + + def test_rmdir(self): + rmdir_dir = self.fileset_gvfs_location + "/test_rmdir" + rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs) + + rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file" + rmdir_actual_file = self.fileset_storage_location + "/test_rmdir/test.file" + self.fs.touch(rmdir_actual_file) + self.assertTrue(self.fs.exists(rmdir_actual_file)) + self.assertTrue(fs.exists(rmdir_file)) + + fs.rm_file(rmdir_file) + + def test_mkdir(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.bucket_name + "1" + mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket) + fs.mkdir(mkdir_dir, create_parents=True) + + self.assertFalse(fs.exists(mkdir_dir)) + self.assertFalse(self.fs.exists("s3://" + new_bucket)) + + def test_makedirs(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.bucket_name + "1" + mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket) + mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket) + + # it takes no effect. + fs.makedirs(mkdir_dir) + with self.assertRaises(OSError): + self.fs.exists(mkdir_actual_dir) + + self.assertFalse(fs.exists(mkdir_dir)) + self.assertFalse(self.fs.exists("s3://" + new_bucket)) + + def test_rm_file(self): + rm_file_dir = self.fileset_gvfs_location + "/test_rm_file" + rm_file_actual_dir = self.fileset_storage_location + "/test_rm_file" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs) + + rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file" + rm_file_actual_file = self.fileset_storage_location + "/test_rm_file/test.file" + self.fs.touch(rm_file_actual_file) + self.assertTrue(self.fs.exists(rm_file_actual_file)) + self.assertTrue(fs.exists(rm_file_file)) + + # test delete file + fs.rm_file(rm_file_file) + self.assertFalse(fs.exists(rm_file_file)) + + # test delete dir + fs.rm_file(rm_file_dir)