Skip to content

Commit

Permalink
[#5188] feat(python-client): Support s3 fileset in python client (#5209)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add support for S3 fileset in the Python client.


### Why are the changes needed?

it's the user needs.

Fix: #5188 

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

Replace with real s3 account and execute the following test.

<img width="1534" alt="image"
src="https://github.com/user-attachments/assets/3d6267ce-8954-43e6-bc54-ac70998df9f9">

./gradlew :clients:client-python:test -PskipDockerTests=false
  • Loading branch information
yuqi1129 authored Oct 24, 2024
1 parent bea3934 commit cefe316
Show file tree
Hide file tree
Showing 6 changed files with 565 additions and 104 deletions.
114 changes: 91 additions & 23 deletions clients/client-python/gravitino/filesystem/gvfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from enum import Enum
from pathlib import PurePosixPath
from typing import Dict, Tuple
Expand Down Expand Up @@ -49,6 +48,7 @@ class StorageType(Enum):
HDFS = "hdfs"
LOCAL = "file"
GCS = "gs"
S3A = "s3a"


class FilesetContextPair:
Expand Down Expand Up @@ -314,7 +314,11 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):

# convert the following to in

if storage_type in [StorageType.HDFS, StorageType.GCS]:
if storage_type in [
StorageType.HDFS,
StorageType.GCS,
StorageType.S3A,
]:
src_context_pair.filesystem().mv(
self._strip_storage_protocol(storage_type, src_actual_path),
self._strip_storage_protocol(storage_type, dst_actual_path),
Expand All @@ -336,6 +340,10 @@ def _rm(self, path):
"Deprecated method, use `rm_file` method instead."
)

def lazy_load_class(self, module_name, class_name):
module = importlib.import_module(module_name)
return getattr(module, class_name)

def rm(self, path, recursive=False, maxdepth=None):
"""Remove a file or directory.
:param path: Virtual fileset path
Expand All @@ -348,11 +356,17 @@ def rm(self, path, recursive=False, maxdepth=None):
)
actual_path = context_pair.actual_file_location()
storage_type = self._recognize_storage_type(actual_path)
context_pair.filesystem().rm(
self._strip_storage_protocol(storage_type, actual_path),
recursive,
maxdepth,
)
fs = context_pair.filesystem()

# S3FileSystem doesn't support maxdepth
if isinstance(fs, self.lazy_load_class("s3fs", "S3FileSystem")):
fs.rm(self._strip_storage_protocol(storage_type, actual_path), recursive)
else:
fs.rm(
self._strip_storage_protocol(storage_type, actual_path),
recursive,
maxdepth,
)

def rm_file(self, path):
"""Remove a file.
Expand Down Expand Up @@ -547,9 +561,11 @@ def _convert_actual_path(
"""

# If the storage path starts with hdfs, gcs, we should use the path as the prefix.
if storage_location.startswith(
f"{StorageType.HDFS.value}://"
) or storage_location.startswith(f"{StorageType.GCS.value}://"):
if (
storage_location.startswith(f"{StorageType.HDFS.value}://")
or storage_location.startswith(f"{StorageType.GCS.value}://")
or storage_location.startswith(f"{StorageType.S3A.value}://")
):
actual_prefix = infer_storage_options(storage_location)["path"]
elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :]
Expand Down Expand Up @@ -586,11 +602,34 @@ def _convert_actual_info(
path = self._convert_actual_path(
entry["name"], storage_location, virtual_location
)

# if entry contains 'mtime', then return the entry with 'mtime' else
# if entry contains 'LastModified', then return the entry with 'LastModified'

if "mtime" in entry:
# HDFS and GCS
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": entry["mtime"],
}

if "LastModified" in entry:
# S3 and OSS
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": entry["LastModified"],
}

# Unknown
return {
"name": path,
"size": entry["size"],
"type": entry["type"],
"mtime": entry["mtime"],
"mtime": None,
}

def _get_fileset_context(self, virtual_path: str, operation: FilesetDataOperation):
Expand Down Expand Up @@ -692,6 +731,8 @@ def _recognize_storage_type(path: str):
return StorageType.LOCAL
if path.startswith(f"{StorageType.GCS.value}://"):
return StorageType.GCS
if path.startswith(f"{StorageType.S3A.value}://"):
return StorageType.S3A
raise GravitinoRuntimeException(
f"Storage type doesn't support now. Path:{path}"
)
Expand All @@ -716,7 +757,7 @@ def _strip_storage_protocol(storage_type: StorageType, path: str):
:param path: The path
:return: The stripped path
"""
if storage_type in (StorageType.HDFS, StorageType.GCS):
if storage_type in (StorageType.HDFS, StorageType.GCS, StorageType.S3A):
return path
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]
Expand Down Expand Up @@ -791,7 +832,9 @@ def _get_filesystem(self, actual_file_location: str):
elif storage_type == StorageType.LOCAL:
fs = LocalFileSystem()
elif storage_type == StorageType.GCS:
fs = ArrowFSWrapper(self._get_gcs_filesystem())
fs = self._get_gcs_filesystem()
elif storage_type == StorageType.S3A:
fs = self._get_s3_filesystem()
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
Expand All @@ -802,22 +845,47 @@ def _get_filesystem(self, actual_file_location: str):
write_lock.release()

def _get_gcs_filesystem(self):
# get All keys from the options that start with 'gravitino.bypass.gcs.' and remove the prefix
gcs_options = {
key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS) :]: value
for key, value in self._options.items()
if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_GCS)
}

# get 'service-account-key' from gcs_options, if the key is not found, throw an exception
service_account_key_path = gcs_options.get(GVFSConfig.GVFS_FILESYSTEM_KEY_FILE)
service_account_key_path = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE
)
if service_account_key_path is None:
raise GravitinoRuntimeException(
"Service account key is not found in the options."
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path
return importlib.import_module("gcsfs").GCSFileSystem(
token=service_account_key_path
)

return importlib.import_module("pyarrow.fs").GcsFileSystem()
def _get_s3_filesystem(self):
# get 'aws_access_key_id' from s3_options, if the key is not found, throw an exception
aws_access_key_id = self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ACCESS_KEY)
if aws_access_key_id is None:
raise GravitinoRuntimeException(
"AWS access key id is not found in the options."
)

# get 'aws_secret_access_key' from s3_options, if the key is not found, throw an exception
aws_secret_access_key = self._options.get(
GVFSConfig.GVFS_FILESYSTEM_S3_SECRET_KEY
)
if aws_secret_access_key is None:
raise GravitinoRuntimeException(
"AWS secret access key is not found in the options."
)

# get 'aws_endpoint_url' from s3_options, if the key is not found, throw an exception
aws_endpoint_url = self._options.get(GVFSConfig.GVFS_FILESYSTEM_S3_ENDPOINT)
if aws_endpoint_url is None:
raise GravitinoRuntimeException(
"AWS endpoint url is not found in the options."
)

return importlib.import_module("s3fs").S3FileSystem(
key=aws_access_key_id,
secret=aws_secret_access_key,
endpoint_url=aws_endpoint_url,
)


fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
8 changes: 5 additions & 3 deletions clients/client-python/gravitino/filesystem/gvfs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class GVFSConfig:
OAUTH2_PATH = "oauth2_path"
OAUTH2_SCOPE = "oauth2_scope"

GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass"
GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs."
GVFS_FILESYSTEM_KEY_FILE = "service-account-key-path"
GVFS_FILESYSTEM_GCS_SERVICE_KEY_FILE = "gcs_service_account_key_path"

GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key"
GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_key"
GVFS_FILESYSTEM_S3_ENDPOINT = "s3_endpoint"
3 changes: 2 additions & 1 deletion clients/client-python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ readerwriterlock==1.0.9
fsspec==2024.3.1
pyarrow==15.0.2
cachetools==5.3.3
google-auth==2.35.0
gcsfs==2024.3.1
s3fs==2024.3.1
Loading

0 comments on commit cefe316

Please sign in to comment.