-
Notifications
You must be signed in to change notification settings - Fork 383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#5188] feat(python-client): Support s3 fileset in python client #5209
Changes from 118 commits
d2447a2
7e5a8b5
f53c5ef
36fedcd
e93fba5
b1e04b6
db00e65
c793582
013f5cb
dba5753
16dfc73
278fcd8
3fb55ad
cd04666
d0bf13e
ffaa064
32d7f3d
d82bf76
dfdb772
8708a8a
ba9f8fa
dae99f7
e22053b
4fb89e0
e5746c0
b2d7bed
380717b
f4041ec
66247ab
3cfb94f
7d1150f
608081b
9edfe82
3079bf0
da49e60
b621d89
05dd006
9d5b8dc
e58f9a0
c521daf
46e996a
da0b7ca
e9ccda4
ba1fe5f
4ffe389
7c44a57
992ba0a
5dbca5f
f27520a
e29e47b
bc1e76f
8a9d3bf
2115e31
c2e55d4
557aa02
5c3fa5c
408eca7
a02065d
8762bae
dc7a915
c230991
dc54880
7ecc040
da46321
27bc2ab
017c42e
9dc0f5a
41ff00d
1fee1e4
1789bd2
2ee1709
05e5d20
8f28211
35cba1e
bcf2f12
f25a37d
a3da011
3517996
27a911a
e34dbea
6bae7e5
fe13f5e
0181632
3ff9eef
2ce660c
f0fa87b
d2921a8
dc68dd1
6431e2f
242888f
3ec2dcc
f754997
2cdfb35
67dbc3a
70a545e
4d54acf
15bbf99
cfcc544
acf51e1
11f9992
4f00a2f
b9ef8f0
9f65fb5
4defcc6
3a907f4
76912b7
4478673
5a194df
c8b5c7c
6ff9353
1dac0f0
f592289
7069b6b
1281e72
f022d6e
0d2ccab
55633e8
217cc5f
6958aa8
8b9b8d7
b4d5728
0306ac5
a35a40d
cfd8a89
a131564
34e3ff4
2cc234a
ba0237e
4df5ea1
4e49e6e
015b788
c414b4d
798b4d1
aa96f63
804622c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,8 @@ class StorageType(Enum): | |
HDFS = "hdfs" | ||
LOCAL = "file" | ||
GCS = "gs" | ||
S3A = "s3a" | ||
S3 = "s3" | ||
|
||
|
||
class FilesetContextPair: | ||
|
@@ -314,7 +316,12 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): | |
|
||
# convert the following to in | ||
|
||
if storage_type in [StorageType.HDFS, StorageType.GCS]: | ||
if storage_type in [ | ||
StorageType.HDFS, | ||
StorageType.GCS, | ||
StorageType.S3, | ||
StorageType.S3A, | ||
]: | ||
src_context_pair.filesystem().mv( | ||
self._strip_storage_protocol(storage_type, src_actual_path), | ||
self._strip_storage_protocol(storage_type, dst_actual_path), | ||
|
@@ -547,9 +554,12 @@ def _convert_actual_path( | |
""" | ||
|
||
# If the storage path starts with hdfs, gcs, we should use the path as the prefix. | ||
if storage_location.startswith( | ||
f"{StorageType.HDFS.value}://" | ||
) or storage_location.startswith(f"{StorageType.GCS.value}://"): | ||
if ( | ||
storage_location.startswith(f"{StorageType.HDFS.value}://") | ||
or storage_location.startswith(f"{StorageType.GCS.value}://") | ||
or storage_location.startswith(f"{StorageType.S3.value}://") | ||
or storage_location.startswith(f"{StorageType.S3A.value}://") | ||
): | ||
actual_prefix = infer_storage_options(storage_location)["path"] | ||
elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): | ||
actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :] | ||
|
@@ -692,6 +702,10 @@ def _recognize_storage_type(path: str): | |
return StorageType.LOCAL | ||
if path.startswith(f"{StorageType.GCS.value}://"): | ||
return StorageType.GCS | ||
if path.startswith(f"{StorageType.S3A.value}://"): | ||
return StorageType.S3A | ||
if path.startswith(f"{StorageType.S3.value}://"): | ||
return StorageType.S3 | ||
raise GravitinoRuntimeException( | ||
f"Storage type doesn't support now. Path:{path}" | ||
) | ||
|
@@ -716,7 +730,12 @@ def _strip_storage_protocol(storage_type: StorageType, path: str): | |
:param path: The path | ||
:return: The stripped path | ||
""" | ||
if storage_type in (StorageType.HDFS, StorageType.GCS): | ||
if storage_type in ( | ||
StorageType.HDFS, | ||
StorageType.GCS, | ||
StorageType.S3A, | ||
StorageType.S3, | ||
): | ||
return path | ||
if storage_type == StorageType.LOCAL: | ||
return path[len(f"{StorageType.LOCAL.value}:") :] | ||
|
@@ -792,6 +811,8 @@ def _get_filesystem(self, actual_file_location: str): | |
fs = LocalFileSystem() | ||
elif storage_type == StorageType.GCS: | ||
fs = ArrowFSWrapper(self._get_gcs_filesystem()) | ||
elif storage_type in (StorageType.S3A, StorageType.S3): | ||
fs = ArrowFSWrapper(self._get_s3_filesystem()) | ||
else: | ||
raise GravitinoRuntimeException( | ||
f"Storage type: `{storage_type}` doesn't support now." | ||
|
@@ -819,5 +840,40 @@ def _get_gcs_filesystem(self): | |
|
||
return importlib.import_module("pyarrow.fs").GcsFileSystem() | ||
|
||
def _get_s3_filesystem(self): | ||
# get All keys from the options that start with 'gravitino.bypass.s3.' and remove the prefix | ||
s3_options = { | ||
key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_S3) :]: value | ||
for key, value in self._options.items() | ||
if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_S3) | ||
} | ||
|
||
# get 'aws_access_key_id' from s3_options, if the key is not found, throw an exception | ||
aws_access_key_id = s3_options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY) | ||
if aws_access_key_id is None: | ||
raise GravitinoRuntimeException( | ||
"AWS access key id is not found in the options." | ||
) | ||
|
||
# get 'aws_secret_access_key' from s3_options, if the key is not found, throw an exception | ||
aws_secret_access_key = s3_options.get(GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY) | ||
if aws_secret_access_key is None: | ||
raise GravitinoRuntimeException( | ||
"AWS secret access key is not found in the options." | ||
) | ||
|
||
# get 'aws_endpoint_url' from s3_options, if the key is not found, throw an exception | ||
aws_endpoint_url = s3_options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT) | ||
if aws_endpoint_url is None: | ||
raise GravitinoRuntimeException( | ||
"AWS endpoint url is not found in the options." | ||
) | ||
|
||
return importlib.import_module("pyarrow.fs").S3FileSystem( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I didn't notice this before, GCS and S3 also have the fsspec implementation(https://github.com/fsspec/gcsfs, https://github.com/fsspec/s3fs), how do you consider the selection here to use PyArrow's implementation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PyArrow's implementation provides an uniform API to users, for example, combined with I have viewed the implementation by Considering the efficiency brought by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, PyArrow officially supports a limited number of storage systems. If you need to expand the storage system, you need to modify the Arrow source code. HDFS chooses to use PyArrow because fsspec actually also calls PyArrow, which is almost the only choice. For other storage, PyArrow may not be the only choice. My advice is not to be restricted by the current selection. We should make the best choice in terms of performance and interface adaptability. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree with this point and I also noticed that the filesystem that Pyarrow supports is very limited. Due to time limitations, I have not completed a comprehensive survey about it. thanks for your suggestion, I will modify the code accordingly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xloya |
||
access_key=aws_access_key_id, | ||
secret_key=aws_secret_access_key, | ||
endpoint_override=aws_endpoint_url, | ||
) | ||
|
||
|
||
fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,3 +35,8 @@ class GVFSConfig: | |
GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass" | ||
GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs." | ||
GVFS_FILESYSTEM_KEY_FILE = "service-account-key-path" | ||
|
||
GVFS_FILESYSTEM_BY_PASS_S3 = "gravitino.bypass.s3." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking that why do we need a prefix "gravitino.bypass.s3." for gcs and aws configurations? I think the configuration keys mentioned above are important for a client to work, it may not be good to use a bypass prefix, what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, let me think a bit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have removed the prefix |
||
GVFS_FILESYSTEM_S3_ACCESS_KEY = "access-key" | ||
GVFS_FILESYSTEM_S3_SECRET_KEY = "secret-key" | ||
GVFS_FILESYSTEM_S3_ENDPOINT = "endpoint" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
import logging | ||
import os | ||
from random import randint | ||
import unittest | ||
|
||
from fsspec.implementations.arrow import ArrowFSWrapper | ||
from pyarrow.fs import S3FileSystem | ||
|
||
from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS | ||
from gravitino import ( | ||
gvfs, | ||
GravitinoClient, | ||
Catalog, | ||
Fileset, | ||
) | ||
from gravitino.exceptions.base import GravitinoRuntimeException | ||
from gravitino.filesystem.gvfs_config import GVFSConfig | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@unittest.skip("This test require S3 service account") | ||
class TestGvfsWithS3(TestGvfsWithHDFS): | ||
# Before running this test, please set the make sure aws-bundle-x.jar has been | ||
# copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory | ||
s3_access_key = "your_access_key" | ||
s3_secret_key = "your_secret_key" | ||
s3_endpoint = "your_endpoint" | ||
bucket_name = "your_bucket_name" | ||
|
||
metalake_name: str = "TestGvfsWithS3_metalake" + str(randint(1, 10000)) | ||
|
||
def setUp(self): | ||
self.options = { | ||
f"{GVFSConfig.GVFS_FILESYSTEM_BY_PASS_S3}{GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY}": self.s3_access_key, | ||
f"{GVFSConfig.GVFS_FILESYSTEM_BY_PASS_S3}{GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY}": self.s3_secret_key, | ||
f"{GVFSConfig.GVFS_FILESYSTEM_BY_PASS_S3}{GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT}": self.s3_endpoint, | ||
} | ||
|
||
def tearDown(self): | ||
self.options = {} | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls._get_gravitino_home() | ||
|
||
cls.hadoop_conf_path = f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf" | ||
# restart the server | ||
cls.restart_server() | ||
# create entity | ||
cls._init_test_entities() | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
cls._clean_test_data() | ||
# reset server conf in case of other ITs like HDFS has changed it and fail | ||
# to reset it | ||
cls._reset_conf(cls.config, cls.hadoop_conf_path) | ||
# restart server | ||
cls.restart_server() | ||
|
||
# clear all config in the conf_path | ||
@classmethod | ||
def _reset_conf(cls, config, conf_path): | ||
logger.info("Reset %s.", conf_path) | ||
if not os.path.exists(conf_path): | ||
raise GravitinoRuntimeException(f"Conf file is not found at `{conf_path}`.") | ||
filtered_lines = [] | ||
with open(conf_path, mode="r", encoding="utf-8") as file: | ||
origin_lines = file.readlines() | ||
|
||
for line in origin_lines: | ||
line = line.strip() | ||
if line.startswith("#"): | ||
# append annotations directly | ||
filtered_lines.append(line + "\n") | ||
|
||
with open(conf_path, mode="w", encoding="utf-8") as file: | ||
for line in filtered_lines: | ||
file.write(line) | ||
|
||
@classmethod | ||
def _init_test_entities(cls): | ||
cls.gravitino_admin_client.create_metalake( | ||
name=cls.metalake_name, comment="", properties={} | ||
) | ||
cls.gravitino_client = GravitinoClient( | ||
uri="http://localhost:8090", metalake_name=cls.metalake_name | ||
) | ||
|
||
cls.config = {} | ||
cls.conf = {} | ||
catalog = cls.gravitino_client.create_catalog( | ||
name=cls.catalog_name, | ||
catalog_type=Catalog.Type.FILESET, | ||
provider=cls.catalog_provider, | ||
comment="", | ||
properties={ | ||
"filesystem-providers": "s3", | ||
"gravitino.bypass.fs.s3a.access.key": cls.s3_access_key, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also for server side, maybe we should clearly define some configurations, not using "gravitino.bypass." for all configurations. I have to think a bit on this, can you please also think a bit from user side? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jerryshao |
||
"gravitino.bypass.fs.s3a.secret.key": cls.s3_secret_key, | ||
"gravitino.bypass.fs.s3a.endpoint": cls.s3_endpoint, | ||
}, | ||
) | ||
catalog.as_schemas().create_schema( | ||
schema_name=cls.schema_name, comment="", properties={} | ||
) | ||
|
||
cls.fileset_storage_location: str = ( | ||
f"s3a://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" | ||
) | ||
cls.fileset_gvfs_location = ( | ||
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" | ||
) | ||
catalog.as_fileset_catalog().create_fileset( | ||
ident=cls.fileset_ident, | ||
fileset_type=Fileset.Type.MANAGED, | ||
comment=cls.fileset_comment, | ||
storage_location=cls.fileset_storage_location, | ||
properties=cls.fileset_properties, | ||
) | ||
|
||
arrow_s3_fs = S3FileSystem( | ||
access_key=cls.s3_access_key, | ||
secret_key=cls.s3_secret_key, | ||
endpoint_override=cls.s3_endpoint, | ||
) | ||
cls.fs = ArrowFSWrapper(arrow_s3_fs) | ||
|
||
def test_modified(self): | ||
modified_dir = self.fileset_gvfs_location + "/test_modified" | ||
modified_actual_dir = self.fileset_storage_location + "/test_modified" | ||
fs = gvfs.GravitinoVirtualFileSystem( | ||
server_uri="http://localhost:8090", | ||
metalake_name=self.metalake_name, | ||
options=self.options, | ||
**self.conf, | ||
) | ||
self.fs.mkdir(modified_actual_dir) | ||
self.assertTrue(self.fs.exists(modified_actual_dir)) | ||
self.assertTrue(fs.exists(modified_dir)) | ||
|
||
self.assertIsNone(fs.modified(modified_dir)) | ||
|
||
# create a file under the dir 'modified_dir'. | ||
file_path = modified_dir + "/test.txt" | ||
fs.touch(file_path) | ||
self.assertTrue(fs.exists(file_path)) | ||
self.assertIsNotNone(fs.modified(file_path)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we add two, "s3a" and "s3"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question, because we only use the s3a scheme in the
S3FileSystemProvider
(https://github.com/apache/gravitino/blob/main/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java#L44), is there any case will use the s3 scheme?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned about any instances where the location starts with
s3
NOTs3a
before, as clarified by @xloya , there seems to be only one entrance and Gravitino is the only ways that can create fileset, so we can safely removes3
here.