From c25726c6191a74fb7a2a53f75745851430d32bdc Mon Sep 17 00:00:00 2001 From: xiaojiebao Date: Mon, 27 May 2024 15:28:50 +0800 Subject: [PATCH] fix code style --- .../gravitino/filesystem/gvfs.py | 301 ++++++++---------- 1 file changed, 130 insertions(+), 171 deletions(-) diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py index 078dd372811..98a49086c7a 100644 --- a/clients/client-python/gravitino/filesystem/gvfs.py +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -7,20 +7,20 @@ import os import secrets import shutil -import fsspec import io -import regex - from contextlib import suppress from enum import Enum -from fsspec import utils +from typing import Dict, Tuple +import fsspec +import regex + +from fsspec.utils import infer_storage_options, mirror_from +from pyarrow.fs import FileType, FileSelector, FileSystem, HadoopFileSystem +from readerwriterlock import rwlock from gravitino.api.catalog import Catalog from gravitino.api.fileset import Fileset from gravitino.client.gravitino_client import GravitinoClient from gravitino.name_identifier import NameIdentifier -from pyarrow.fs import FileSystem -from readerwriterlock import rwlock -from typing import Dict, Tuple PROTOCOL_NAME = "gvfs" @@ -70,37 +70,37 @@ def __init__(self, server_uri, metalake_name, **kwargs): super().__init__(**kwargs) + @classmethod def _strip_protocol(cls, path): - ops = utils.infer_storage_options(path) + ops = infer_storage_options(path) path = ops["path"] - if path.startswith("//"): - # special case for "hdfs://path" (without the triple slash) - path = path[1:] return path + @property + def fsid(self): + raise RuntimeError("Unsupported method now.") + + def sign(self, path, expiration=None, **kwargs): + raise RuntimeError("Unsupported method now.") + def ls(self, path, detail=True, **kwargs): context: FilesetContext = self._get_fileset_context(path) if context.actual_path.startswith(StorageType.HDFS.value): - from pyarrow.fs import FileSelector - entries = [] for entry in context.fs.get_file_info( FileSelector(self._strip_protocol(context.actual_path)) ): entries.append( - self._convert_file_info_path_prefix( + self._convert_path_prefix( entry, context.fileset.storage_location(), self._get_virtual_location(context.name_identifier, True), ) ) return entries - else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." + ) def info(self, path, **kwargs): context: FilesetContext = self._get_fileset_context(path) @@ -108,44 +108,37 @@ def info(self, path, **kwargs): actual_path = self._strip_protocol(context.actual_path) [info] = context.fs.get_file_info([actual_path]) - return self._convert_file_info_path_prefix( + return self._convert_path_prefix( info, context.fileset.storage_location(), self._get_virtual_location(context.name_identifier, True), ) - else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." + ) def exists(self, path, **kwargs): try: self.info(path) except FileNotFoundError: return False - else: - return True - - def cp_file(self, src, dst, **kwargs): - src_identifier: NameIdentifier = self._extract_identifier(src) - dst_identifier: NameIdentifier = self._extract_identifier(dst) - if not src_identifier == dst_identifier: - raise ValueError( - "Destination file path identifier: `{}` should be same with src file path identifier: `{}`.".format( - dst_identifier, src_identifier - ) + return True + + def cp_file(self, path1, path2, **kwargs): + src_identifier: NameIdentifier = self._extract_identifier(path1) + dst_identifier: NameIdentifier = self._extract_identifier(path2) + if src_identifier != dst_identifier: + raise RuntimeError( + f"Destination file path identifier: `{dst_identifier}` should be same with src file path " + f"identifier: `{src_identifier}`." ) - src_context: FilesetContext = self._get_fileset_context(src) + src_context: FilesetContext = self._get_fileset_context(path1) if src_context.actual_path.startswith(StorageType.HDFS.value): if self._check_mount_single_file(src_context.fileset, src_context.fs): - raise ValueError( - "Cannot cp file of the fileset: {} which only mounts to a single file.".format( - src_identifier - ) + raise RuntimeError( + f"Cannot cp file of the fileset: {src_identifier} which only mounts to a single file." ) - dst_context: FilesetContext = self._get_fileset_context(dst) + dst_context: FilesetContext = self._get_fileset_context(path2) src_actual_path = self._strip_protocol(src_context.actual_path).rstrip("/") dst_actual_path = self._strip_protocol(dst_context.actual_path).rstrip("/") @@ -161,40 +154,33 @@ def cp_file(self, src, dst, **kwargs): dst_context.fs.delete_file(tmp_dst_name) raise else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - src_context.name_identifier - ) + raise RuntimeError( + f"Storage under the fileset: `{src_context.name_identifier}` doesn't support now." ) - def mv(self, src, dst, **kwargs): - src_identifier: NameIdentifier = self._extract_identifier(src) - dst_identifier: NameIdentifier = self._extract_identifier(dst) - if not src_identifier == dst_identifier: - raise ValueError( - "Destination file path identifier: `{}` should be same with src file path identifier: `{}`.".format( - dst_identifier, src_identifier - ) + def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): + src_identifier: NameIdentifier = self._extract_identifier(path1) + dst_identifier: NameIdentifier = self._extract_identifier(path2) + if src_identifier != dst_identifier: + raise RuntimeError( + f"Destination file path identifier: `{dst_identifier}`" + f" should be same with src file path identifier: `{src_identifier}`." ) - src_context: FilesetContext = self._get_fileset_context(src) + src_context: FilesetContext = self._get_fileset_context(path1) if src_context.actual_path.startswith(StorageType.HDFS.value): if self._check_mount_single_file(src_context.fileset, src_context.fs): - raise ValueError( - "Cannot cp file of the fileset: {} which only mounts to a single file.".format( - src_identifier - ) + raise RuntimeError( + f"Cannot cp file of the fileset: {src_identifier} which only mounts to a single file." ) - dst_context: FilesetContext = self._get_fileset_context(dst) + dst_context: FilesetContext = self._get_fileset_context(path2) src_actual_path = self._strip_protocol(src_context.actual_path).rstrip("/") dst_actual_path = self._strip_protocol(dst_context.actual_path).rstrip("/") src_context.fs.move(src_actual_path, dst_actual_path) else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - src_context.name_identifier - ) + raise RuntimeError( + f"Storage under the fileset: `{src_context.name_identifier}` doesn't support now." ) def rm_file(self, path): @@ -203,62 +189,61 @@ def rm_file(self, path): actual_path = self._strip_protocol(context.actual_path) context.fs.delete_file(actual_path) else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." ) - def rm(self, path, recursive=False, **kwargs): + def _rm(self, path): + raise RuntimeError("Deprecated method, use rm_file method instead.") + + def rm(self, path, recursive=False, maxdepth=None, **kwargs): context: FilesetContext = self._get_fileset_context(path) if context.actual_path.startswith(StorageType.HDFS.value): - from pyarrow.fs import FileType - actual_path = self._strip_protocol(context.actual_path).rstrip("/") [info] = context.fs.get_file_info([actual_path]) if info.type is FileType.Directory: if recursive: context.fs.delete_dir(actual_path) else: - raise ValueError("Cannot delete directories that recursive is `False`.") + raise RuntimeError( + "Cannot delete directories that recursive is `False`." + ) else: context.fs.delete_file(actual_path) else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." ) - def _open(self, path, mode="rb", block_size=None, seekable=True, **kwargs): + def _open( + self, + path, + mode="rb", + block_size=None, + autocommit=True, + cache_options=None, + **kwargs, + ): context: FilesetContext = self._get_fileset_context(path) if context.actual_path.startswith(StorageType.HDFS.value): if mode == "rb": - if seekable: - method = context.fs.open_input_file - else: - method = context.fs.open_input_stream + method = context.fs.open_input_stream elif mode == "wb": method = context.fs.open_output_stream elif mode == "ab": method = context.fs.open_append_stream else: - raise ValueError("Unsupported mode for Arrow FileSystem: {}.".format(mode)) + raise RuntimeError(f"Unsupported mode for Arrow FileSystem: {mode!r}.") _kwargs = {} - if mode != "rb" or not seekable: - _kwargs["compression"] = None stream = method(context.actual_path, **_kwargs) return HDFSFile( self, stream, context.actual_path, mode, block_size, **kwargs ) - else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." + ) def mkdir(self, path, create_parents=True, **kwargs): if create_parents: @@ -269,10 +254,8 @@ def mkdir(self, path, create_parents=True, **kwargs): actual_path = self._strip_protocol(context.actual_path) context.fs.create_dir(actual_path, recursive=False) else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." ) def makedirs(self, path, exist_ok=True): @@ -281,10 +264,8 @@ def makedirs(self, path, exist_ok=True): actual_path = self._strip_protocol(context.actual_path) context.fs.create_dir(actual_path) else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." ) def rmdir(self, path): @@ -293,38 +274,34 @@ def rmdir(self, path): actual_path = self._strip_protocol(context.actual_path) context.fs.delete_dir(actual_path) else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." ) + def created(self, path): + raise RuntimeError("Unsupported method now.") + def modified(self, path): context: FilesetContext = self._get_fileset_context(path) if context.actual_path.startswith(StorageType.HDFS.value): actual_path = self._strip_protocol(context.actual_path) return context.fs.get_file_info(actual_path).mtime - else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - context.name_identifier - ) - ) + raise RuntimeError( + f"Storage under the fileset: `{context.name_identifier}` doesn't support now." + ) def cat_file(self, path, start=None, end=None, **kwargs): - NotImplementedError() + raise RuntimeError("Unsupported method now.") - def get_file(self, rpath, lpath, **kwargs): - NotImplementedError() - - def _convert_file_info_path_prefix(self, info, storage_location, virtual_location): - from pyarrow.fs import FileType + def get_file(self, rpath, lpath, callback=None, outfile=None, **kwargs): + raise RuntimeError("Unsupported method now.") + def _convert_path_prefix(self, info, storage_location, virtual_location): actual_prefix = self._strip_protocol(storage_location) virtual_prefix = self._strip_protocol(virtual_location) if not info.path.startswith(actual_prefix): raise ValueError( - "Path {} does not start with valid prefix {}.".format(info.path, actual_prefix) + f"Path {info.path} does not start with valid prefix {actual_prefix}." ) if info.type is FileType.Directory: @@ -337,9 +314,7 @@ def _convert_file_info_path_prefix(self, info, storage_location, virtual_locatio kind = "other" return { - "name": "{}{}".format( - self._gvfs_prefix, info.path.replace(actual_prefix, virtual_prefix) - ), + "name": f"{self._gvfs_prefix}{info.path.replace(actual_prefix, virtual_prefix)}", "size": info.size, "type": kind, "mtime": info.mtime, @@ -375,8 +350,6 @@ def _get_fileset_context(self, virtual_path: str) -> FilesetContext: fileset: Fileset = self._load_fileset_from_server(identifier) storage_location = fileset.storage_location() if storage_location.startswith(StorageType.HDFS.value): - from pyarrow.fs import HadoopFileSystem - fs = HadoopFileSystem.from_uri(storage_location) actual_path = self._get_actual_path_by_ident( identifier, fileset, fs, virtual_path @@ -384,12 +357,9 @@ def _get_fileset_context(self, virtual_path: str) -> FilesetContext: self.cache[identifier] = (fileset, fs) context = FilesetContext(identifier, fileset, fs, actual_path) return context - else: - raise ValueError( - "Storage under the fileset: `{}` doesn't support now.".format( - identifier - ) - ) + raise ValueError( + f"Storage under the fileset: `{identifier}` doesn't support now." + ) finally: write_lock.release() @@ -404,10 +374,7 @@ def _extract_identifier(self, path) -> NameIdentifier: match.group(2), match.group(3), ) - else: - raise ValueError( - "path: `{}` doesn't contains valid identifier.".format(path) - ) + raise ValueError(f"path: `{path}` doesn't contains valid identifier.") def _load_fileset_from_server(self, identifier: NameIdentifier) -> Fileset: catalog: Catalog = self.client.load_catalog( @@ -427,53 +394,45 @@ def _get_actual_path_by_ident( with_scheme = virtual_path.startswith(self._gvfs_prefix) virtual_location = self._get_virtual_location(identifier, with_scheme) storage_location = fileset.storage_location() - try: - if self._check_mount_single_file(fileset, fs): - if virtual_path != virtual_location: - raise Exception( - "Path: {} should be same with the virtual location: {} when the fileset only mounts a single file.".format( - virtual_path, virtual_location - ) - ) - return storage_location - else: - return virtual_path.replace(virtual_location, storage_location, 1) - except Exception as e: - raise Exception( - "Cannot resolve path: {} to actual storage path, exception: {}".format( - virtual_path, str(e) + if self._check_mount_single_file(fileset, fs): + if virtual_path != virtual_location: + raise ValueError( + f"Path: {virtual_path} should be same with the virtual location: {virtual_location}" + " when the fileset only mounts a single file." ) - ) + return storage_location + return virtual_path.replace(virtual_location, storage_location, 1) def _get_virtual_location( self, identifier: NameIdentifier, with_scheme: bool ) -> str: - return "{}/{}/{}/{}".format( - self._gvfs_prefix if with_scheme is True else "", - identifier.namespace().level(1), - identifier.namespace().level(2), - identifier.name(), + prefix = self._gvfs_prefix if with_scheme is True else "" + return ( + f"{prefix}" + f"/{identifier.namespace().level(1)}" + f"/{identifier.namespace().level(2)}" + f"/{identifier.name()}" ) def _check_mount_single_file(self, fileset: Fileset, fs: FileSystem) -> bool: - from pyarrow.fs import FileType - - try: - [info] = fs.get_file_info( - [(self._strip_protocol(fileset.storage_location()))] - ) - if info.type is FileType.File: - return True - else: - return False - except Exception as e: - raise Exception( - "Cannot check whether the fileset: {} mounts a single file, exception: {}".format( - fileset.storage_location(), str(e) - ) - ) - - + [info] = fs.get_file_info([(self._strip_protocol(fileset.storage_location()))]) + return info.type is FileType.File + + +@mirror_from( + "stream", + [ + "read", + "seek", + "tell", + "write", + "readable", + "writable", + "close", + "size", + "seekable", + ], +) class HDFSFile(io.IOBase): def __init__(self, fs, stream, path, mode, block_size=None, **kwargs): self.path = path