diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py index a9201a83326..8176c325a81 100644 --- a/clients/client-python/gravitino/filesystem/gvfs.py +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -49,6 +49,7 @@ class StorageType(Enum): LOCAL = "file" GCS = "gs" S3A = "s3a" + OSS = "oss" class FilesetContextPair: @@ -318,6 +319,7 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): StorageType.HDFS, StorageType.GCS, StorageType.S3A, + StorageType.OSS, ]: src_context_pair.filesystem().mv( self._strip_storage_protocol(storage_type, src_actual_path), @@ -567,6 +569,14 @@ def _convert_actual_path( or storage_location.startswith(f"{StorageType.S3A.value}://") ): actual_prefix = infer_storage_options(storage_location)["path"] + elif storage_location.startswith(f"{StorageType.OSS.value}:/"): + ops = infer_storage_options(storage_location) + if "host" not in ops or "path" not in ops: + raise GravitinoRuntimeException( + f"Storage location:{storage_location} doesn't support now." + ) + + actual_prefix = ops["host"] + ops["path"] elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :] else: @@ -733,6 +743,8 @@ def _recognize_storage_type(path: str): return StorageType.GCS if path.startswith(f"{StorageType.S3A.value}://"): return StorageType.S3A + if path.startswith(f"{StorageType.OSS.value}://"): + return StorageType.OSS raise GravitinoRuntimeException( f"Storage type doesn't support now. Path:{path}" ) @@ -756,12 +768,46 @@ def _strip_storage_protocol(storage_type: StorageType, path: str): :param storage_type: The storage type :param path: The path :return: The stripped path + + We will handle OSS differently from S3 and GCS, because OSS has different behavior than S3 and GCS. + Please see the following example: + + ``` + >> oss = context_pair.filesystem() + >> oss.ls('oss://bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls') + DEBUG:ossfs:Get directory listing page for bucket-xiaoyu/test_gvfs_catalog678/ + test_gvfs_schema/test_gvfs_fileset + DEBUG:ossfs:CALL: ObjectIterator - () - {'prefix': 'test_gvfs_catalog678/test_gvfs_schema + /test_gvfs_fileset/', 'delimiter': '/'} + [] + >> oss.ls('bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls') + DEBUG:ossfs:Get directory listing page for bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema + /test_gvfs_fileset/test_ls + DEBUG:ossfs:CALL: ObjectIterator - () - {'prefix': 'test_gvfs_catalog678/test_gvfs_schema + /test_gvfs_fileset/test_ls/', 'delimiter': '/'} + [{'name': 'bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls + /test.file', 'type': 'file', 'size': 0, 'LastModified': 1729754793, + 'Size': 0, 'Key': 'bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/ + test_gvfs_fileset/test_ls/test.file'}] + + ``` + + Please take a look at the above example: if we do not remove the protocol (starts with oss://), + it will always return an empty array when we call `oss.ls`, however, if we remove the protocol, + it will produce the correct result as expected. """ if storage_type in (StorageType.HDFS, StorageType.GCS, StorageType.S3A): return path if storage_type == StorageType.LOCAL: return path[len(f"{StorageType.LOCAL.value}:") :] + # OSS has different behavior than S3 and GCS, if we do not remove the + # protocol, it will always return an empty array. + if storage_type == StorageType.OSS: + if path.startswith(f"{StorageType.OSS.value}://"): + return path[len(f"{StorageType.OSS.value}://") :] + return path + raise GravitinoRuntimeException( f"Storage type:{storage_type} doesn't support now." ) @@ -835,6 +881,8 @@ def _get_filesystem(self, actual_file_location: str): fs = self._get_gcs_filesystem() elif storage_type == StorageType.S3A: fs = self._get_s3_filesystem() + elif storage_type == StorageType.OSS: + fs = self._get_oss_filesystem() else: raise GravitinoRuntimeException( f"Storage type: `{storage_type}` doesn't support now." @@ -887,5 +935,35 @@ def _get_s3_filesystem(self): endpoint_url=aws_endpoint_url, ) + def _get_oss_filesystem(self): + # get 'oss_access_key_id' from oss options, if the key is not found, throw an exception + oss_access_key_id = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY) + if oss_access_key_id is None: + raise GravitinoRuntimeException( + "OSS access key id is not found in the options." + ) + + # get 'oss_secret_access_key' from oss options, if the key is not found, throw an exception + oss_secret_access_key = self._options.get( + GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY + ) + if oss_secret_access_key is None: + raise GravitinoRuntimeException( + "OSS secret access key is not found in the options." + ) + + # get 'oss_endpoint_url' from oss options, if the key is not found, throw an exception + oss_endpoint_url = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT) + if oss_endpoint_url is None: + raise GravitinoRuntimeException( + "OSS endpoint url is not found in the options." + ) + + return importlib.import_module("ossfs").OSSFileSystem( + key=oss_access_key_id, + secret=oss_secret_access_key, + endpoint=oss_endpoint_url, + ) + fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py b/clients/client-python/gravitino/filesystem/gvfs_config.py index 7ffacdb095d..00ae8c6419e 100644 --- a/clients/client-python/gravitino/filesystem/gvfs_config.py +++ b/clients/client-python/gravitino/filesystem/gvfs_config.py @@ -37,3 +37,7 @@ class GVFSConfig: GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key" GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_key" GVFS_FILESYSTEM_S3_ENDPOINT = "s3_endpoint" + + GVFS_FILESYSTEM_OSS_ACCESS_KEY = "oss_access_key" + GVFS_FILESYSTEM_OSS_SECRET_KEY = "oss_secret_key" + GVFS_FILESYSTEM_OSS_ENDPOINT = "oss_endpoint" diff --git a/clients/client-python/requirements.txt b/clients/client-python/requirements.txt index 1d0f4fadd5d..8eebd572770 100644 --- a/clients/client-python/requirements.txt +++ b/clients/client-python/requirements.txt @@ -24,4 +24,5 @@ fsspec==2024.3.1 pyarrow==15.0.2 cachetools==5.3.3 gcsfs==2024.3.1 -s3fs==2024.3.1 \ No newline at end of file +s3fs==2024.3.1 +ossfs==2023.12.0 \ No newline at end of file diff --git a/clients/client-python/tests/integration/test_gvfs_with_oss.py b/clients/client-python/tests/integration/test_gvfs_with_oss.py new file mode 100644 index 00000000000..95b385ea925 --- /dev/null +++ b/clients/client-python/tests/integration/test_gvfs_with_oss.py @@ -0,0 +1,353 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + + +from ossfs import OSSFileSystem + +from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS +from gravitino import ( + gvfs, + GravitinoClient, + Catalog, + Fileset, +) +from gravitino.exceptions.base import GravitinoRuntimeException +from gravitino.filesystem.gvfs_config import GVFSConfig + + +logger = logging.getLogger(__name__) + + +@unittest.skip("This test require oss service account") +class TestGvfsWithOSS(TestGvfsWithHDFS): + # Before running this test, please set the make sure aliyun-bundle-x.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + oss_access_key = "your_access_key" + oss_secret_key = "your_secret_key" + oss_endpoint = "your_endpoint" + bucket_name = "your_bucket_name" + + metalake_name: str = "TestGvfsWithOSS_metalake" + str(randint(1, 10000)) + + def setUp(self): + self.options = { + f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY}": self.oss_access_key, + f"{GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY}": self.oss_secret_key, + f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT}": self.oss_endpoint, + } + + def tearDown(self): + self.options = {} + + @classmethod + def setUpClass(cls): + cls._get_gravitino_home() + + cls.hadoop_conf_path = f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf" + # restart the server + cls.restart_server() + # create entity + cls._init_test_entities() + + @classmethod + def tearDownClass(cls): + cls._clean_test_data() + # reset server conf in case of other ITs like HDFS has changed it and fail + # to reset it + cls._reset_conf(cls.config, cls.hadoop_conf_path) + # restart server + cls.restart_server() + + # clear all config in the conf_path + @classmethod + def _reset_conf(cls, config, conf_path): + logger.info("Reset %s.", conf_path) + if not os.path.exists(conf_path): + raise GravitinoRuntimeException(f"Conf file is not found at `{conf_path}`.") + filtered_lines = [] + with open(conf_path, mode="r", encoding="utf-8") as file: + origin_lines = file.readlines() + + for line in origin_lines: + line = line.strip() + if line.startswith("#"): + # append annotations directly + filtered_lines.append(line + "\n") + + with open(conf_path, mode="w", encoding="utf-8") as file: + for line in filtered_lines: + file.write(line) + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "oss", + "gravitino.bypass.fs.oss.accessKeyId": cls.oss_access_key, + "gravitino.bypass.fs.oss.accessKeySecret": cls.oss_secret_key, + "gravitino.bypass.fs.oss.endpoint": cls.oss_endpoint, + "gravitino.bypass.fs.oss.impl": "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem", + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"oss://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=cls.fileset_storage_location, + properties=cls.fileset_properties, + ) + + cls.fs = OSSFileSystem( + key=cls.oss_access_key, + secret=cls.oss_secret_key, + endpoint=cls.oss_endpoint, + ) + + def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance): + # OSS will not create a directory, so the directory will not exist. + self.fs.mkdir(actual_dir) + self.assertFalse(self.fs.exists(actual_dir)) + self.assertFalse(gvfs_instance.exists(gvfs_dir)) + + def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance): + self.fs.makedirs(actual_dir) + self.assertFalse(self.fs.exists(actual_dir)) + self.assertFalse(gvfs_instance.exists(gvfs_dir)) + + def test_modified(self): + modified_dir = self.fileset_gvfs_location + "/test_modified" + modified_actual_dir = self.fileset_storage_location + "/test_modified" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(modified_dir, modified_actual_dir, fs) + # S3 only supports getting the `object` modify time, so the modified time will be None + # if it's a directory. + # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3') + # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3') + # >>> print(r) + # None + # self.assertIsNone(fs.modified(modified_dir)) + + # create a file under the dir 'modified_dir'. + file_path = modified_dir + "/test.txt" + fs.touch(file_path) + self.assertTrue(fs.exists(file_path)) + self.assertIsNotNone(fs.modified(file_path)) + + def test_rm(self): + rm_dir = self.fileset_gvfs_location + "/test_rm" + rm_actual_dir = self.fileset_storage_location + "/test_rm" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rm_dir, rm_actual_dir, fs) + + rm_file = self.fileset_gvfs_location + "/test_rm/test.file" + rm_actual_file = self.fileset_storage_location + "/test_rm/test.file" + fs.touch(rm_file) + self.assertTrue(self.fs.exists(rm_actual_file)) + self.assertTrue(fs.exists(rm_file)) + + # test delete file + fs.rm(rm_file) + self.assertFalse(fs.exists(rm_file)) + + # test delete dir with recursive = false + rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file" + rm_new_actual_file = self.fileset_storage_location + "/test_rm/test_new.file" + self.fs.touch(rm_new_actual_file) + self.assertTrue(self.fs.exists(rm_new_actual_file)) + self.assertTrue(fs.exists(rm_new_file)) + + def test_rmdir(self): + rmdir_dir = self.fileset_gvfs_location + "/test_rmdir" + rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs) + + rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file" + rmdir_actual_file = self.fileset_storage_location + "/test_rmdir/test.file" + self.fs.touch(rmdir_actual_file) + self.assertTrue(self.fs.exists(rmdir_actual_file)) + self.assertTrue(fs.exists(rmdir_file)) + + fs.rm_file(rmdir_file) + + def test_mkdir(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.bucket_name + "1" + mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket) + mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket) + fs.mkdir(mkdir_dir, create_parents=True) + + with self.assertRaises(FileNotFoundError): + self.fs.exists(mkdir_actual_dir) + + self.assertFalse(fs.exists(mkdir_dir)) + + with self.assertRaises(FileNotFoundError): + self.fs.exists("oss://" + new_bucket) + + def test_makedirs(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.bucket_name + "1" + mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket) + mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket) + + # it takes no effect. + fs.makedirs(mkdir_dir) + + with self.assertRaises(FileNotFoundError): + self.fs.exists(mkdir_actual_dir) + + self.assertFalse(fs.exists(mkdir_dir)) + with self.assertRaises(FileNotFoundError): + self.fs.exists("oss://" + new_bucket) + + def test_rm_file(self): + rm_file_dir = self.fileset_gvfs_location + "/test_rm_file" + rm_file_actual_dir = self.fileset_storage_location + "/test_rm_file" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs) + + rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file" + rm_file_actual_file = self.fileset_storage_location + "/test_rm_file/test.file" + self.fs.touch(rm_file_actual_file) + self.assertTrue(self.fs.exists(rm_file_actual_file)) + self.assertTrue(fs.exists(rm_file_file)) + + # test delete file + fs.rm_file(rm_file_file) + self.assertFalse(fs.exists(rm_file_file)) + + # test delete dir + fs.rm_file(rm_file_dir) + + def test_info(self): + info_dir = self.fileset_gvfs_location + "/test_info" + info_actual_dir = self.fileset_storage_location + "/test_info" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(info_dir, info_actual_dir, fs) + + info_file = self.fileset_gvfs_location + "/test_info/test.file" + info_actual_file = self.fileset_storage_location + "/test_info/test.file" + self.fs.touch(info_actual_file) + self.assertTrue(self.fs.exists(info_actual_file)) + + ## OSS info has different behavior than S3 info. For OSS info, the name of the + ## directory will have a trailing slash if it's a directory and the path + # does not end with a slash, while S3 info will not have a trailing + # slash if it's a directory. + + # >> > oss.info('bucket-xiaoyu/lisi') + # {'name': 'bucket-xiaoyu/lisi/', 'type': 'directory', + # 'size': 0, 'Size': 0, 'Key': 'bucket-xiaoyu/lisi/'} + # >> > oss.info('bucket-xiaoyu/lisi/') + # {'name': 'bucket-xiaoyu/lisi', 'size': 0, + # 'type': 'directory', 'Size': 0, + # 'Key': 'bucket-xiaoyu/lisi' + + # >> > s3.info('paimon-bucket/lisi'); + # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0, + # 'StorageClass': 'DIRECTORY'} + # >> > s3.info('paimon-bucket/lisi/'); + # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0, + # 'StorageClass': 'DIRECTORY'} + + dir_info = fs.info(info_dir) + self.assertEqual(dir_info["name"][:-1], info_dir[len("gvfs://") :]) + + file_info = fs.info(info_file) + self.assertEqual(file_info["name"], info_file[len("gvfs://") :])