Skip to content

Commit

Permalink
Remove s3 related code.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed Oct 17, 2024
1 parent 11f9992 commit 4f00a2f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 61 deletions.
66 changes: 6 additions & 60 deletions clients/client-python/gravitino/filesystem/gvfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from fsspec.utils import infer_storage_options
from pyarrow.fs import HadoopFileSystem
from pyarrow.fs import GcsFileSystem
from pyarrow.fs import S3FileSystem

from readerwriterlock import rwlock
from gravitino.audit.caller_context import CallerContext, CallerContextHolder
Expand All @@ -51,7 +50,6 @@ class StorageType(Enum):
HDFS = "hdfs"
LOCAL = "file"
GCS = "gs"
S3 = "s3"


class FilesetContextPair:
Expand Down Expand Up @@ -317,7 +315,7 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):

# convert the following to in

if storage_type in [StorageType.HDFS, StorageType.GCS, StorageType.S3]:
if storage_type in [StorageType.HDFS, StorageType.GCS]:
src_context_pair.filesystem().mv(
self._strip_storage_protocol(storage_type, src_actual_path),
self._strip_storage_protocol(storage_type, dst_actual_path),
Expand Down Expand Up @@ -548,13 +546,11 @@ def _convert_actual_path(
:param virtual_location: Virtual location
:return A virtual path
"""
# If the storage path start with hdfs, gcs, s3, s3a or s3n, we should use the path as the prefix.

if (
storage_location.startswith(f"{StorageType.HDFS.value}://")
or storage_location.startswith(f"{StorageType.GCS.value}://")
or storage_location.startswith(f"{StorageType.S3.value}://")
):
# If the storage path starts with hdfs, gcs, we should use the path as the prefix.
if storage_location.startswith(
f"{StorageType.HDFS.value}://"
) or storage_location.startswith(f"{StorageType.GCS.value}://"):
actual_prefix = infer_storage_options(storage_location)["path"]
elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :]
Expand Down Expand Up @@ -697,8 +693,6 @@ def _recognize_storage_type(path: str):
return StorageType.LOCAL
if path.startswith(f"{StorageType.GCS.value}://"):
return StorageType.GCS
if path.startswith(f"{StorageType.S3.value}://"):
return StorageType.S3
raise GravitinoRuntimeException(
f"Storage type doesn't support now. Path:{path}"
)
Expand All @@ -723,7 +717,7 @@ def _strip_storage_protocol(storage_type: StorageType, path: str):
:param path: The path
:return: The stripped path
"""
if storage_type in (StorageType.HDFS, StorageType.S3, StorageType.GCS):
if storage_type in (StorageType.HDFS, StorageType.GCS):
return path
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]
Expand Down Expand Up @@ -798,8 +792,6 @@ def _get_filesystem(self, actual_file_location: str):
fs = LocalFileSystem()
elif storage_type == StorageType.GCS:
fs = ArrowFSWrapper(self._get_gcs_filesystem())
elif storage_type == StorageType.S3:
fs = ArrowFSWrapper(self._get_s3_filesystem())
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
Expand All @@ -823,54 +815,8 @@ def _get_gcs_filesystem(self):
raise GravitinoRuntimeException(
"Service account key is not found in the options."
)

# scopes = ["https://www.googleapis.com/auth/cloud-platform"]
# credentials = service_account.Credentials.from_service_account_file(
# service_account_key_path, scopes=scopes)
# credentials.refresh(Request())

# access_token = credentials.token
# expiration = credentials.expiry

# return GcsFileSystem(access_token=access_token,
# credential_token_expiration=expiration)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_key_path
return GcsFileSystem()

def _get_s3_filesystem(self):
# get All keys from the options that start with 'gravitino.bypass.s3.' and remove the prefix
s3_options = {
key[len(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_S3) :]: value
for key, value in self._options.items()
if key.startswith(GVFSConfig.GVFS_FILESYSTEM_BY_PASS_S3)
}

# get 'aws_access_key_id' from s3_options, if the key is not found, throw an exception
aws_access_key_id = s3_options.get("aws_access_key_id")
if aws_access_key_id is None:
raise GravitinoRuntimeException(
"AWS access key id is not found in the options."
)

# get 'aws_secret_access_key' from s3_options, if the key is not found, throw an exception
aws_secret_access_key = s3_options.get("aws_secret_access_key")
if aws_secret_access_key is None:
raise GravitinoRuntimeException(
"AWS secret access key is not found in the options."
)

# get 'aws_endpoint_url' from s3_options, if the key is not found, throw an exception
aws_endpoint_url = s3_options.get("aws_endpoint_url")
if aws_endpoint_url is None:
raise GravitinoRuntimeException(
"AWS endpoint url is not found in the options."
)

return S3FileSystem(
key=aws_access_key_id,
secret=aws_secret_access_key,
endpoint_override=aws_endpoint_url,
)


fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
1 change: 0 additions & 1 deletion clients/client-python/gravitino/filesystem/gvfs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ class GVFSConfig:

GVFS_FILESYSTEM_BY_PASS = "gravitino.bypass"
GVFS_FILESYSTEM_BY_PASS_GCS = "gravitino.bypass.gcs."
GVFS_FILESYSTEM_BY_PASS_S3 = "gravitino.bypass.s3."

0 comments on commit 4f00a2f

Please sign in to comment.