From 2420ab89292a8e06cde55d9f5383ee4fc4d453d1 Mon Sep 17 00:00:00 2001 From: xiaojiebao Date: Thu, 23 May 2024 15:24:06 +0800 Subject: [PATCH] support gvfs --- clients/client-python/gravitino/__init__.py | 1 + .../gravitino/filesystem/__init__.py | 4 + .../gravitino/filesystem/gvfs.py | 318 ++++++++++++++++++ .../gravitino/name_identifier.py | 2 +- clients/client-python/requirements.txt | 5 +- 5 files changed, 328 insertions(+), 2 deletions(-) create mode 100644 clients/client-python/gravitino/filesystem/__init__.py create mode 100644 clients/client-python/gravitino/filesystem/gvfs.py diff --git a/clients/client-python/gravitino/__init__.py b/clients/client-python/gravitino/__init__.py index 24db62a279b..21876e0600a 100644 --- a/clients/client-python/gravitino/__init__.py +++ b/clients/client-python/gravitino/__init__.py @@ -13,3 +13,4 @@ from gravitino.client.gravitino_admin_client import GravitinoAdminClient from gravitino.client.gravitino_metalake import GravitinoMetalake from gravitino.name_identifier import NameIdentifier +from gravitino.filesystem import gvfs diff --git a/clients/client-python/gravitino/filesystem/__init__.py b/clients/client-python/gravitino/filesystem/__init__.py new file mode 100644 index 00000000000..5779a3ad252 --- /dev/null +++ b/clients/client-python/gravitino/filesystem/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py new file mode 100644 index 00000000000..c1a90910b0a --- /dev/null +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -0,0 +1,318 @@ +""" +Copyright 2024 Datastrato Pvt Ltd. +This software is licensed under the Apache License version 2. +""" + +import errno +import os + +import fsspec +import io +import regex + +from enum import Enum +from fsspec import utils +from gravitino.api.catalog import Catalog +from gravitino.api.fileset import Fileset +from gravitino.client.gravitino_client import GravitinoClient +from gravitino.name_identifier import NameIdentifier +from pyarrow.fs import HadoopFileSystem +from readerwriterlock import rwlock +from typing import Dict + +PROTOCOL_NAME = "gvfs" + + +class StorageType(Enum): + LOCAL = "file://" + HDFS = "hdfs://" + + +class FilesetContext: + def __init__( + self, + name_identifier: NameIdentifier, + fileset: Fileset, + fs, + actual_path, + ): + self.name_identifier = name_identifier + self.fileset = fileset + self.fs = fs + self.actual_path = actual_path + + def get_name_identifier(self): + return self.name_identifier + + def get_fileset(self): + return self.fileset + + def get_fs(self): + return self.fs + + def get_actual_path(self): + return self.actual_path + + +class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem): + protocol = PROTOCOL_NAME + _gvfs_prefix = "gvfs://fileset" + _identifier_pattern = regex.compile( + "^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$" + ) + + def __init__(self, server_uri, metalake_name, auth_type, extra_conf=None, **kwargs): + self.pars = (server_uri, metalake_name, auth_type, extra_conf) + self.metalake = metalake_name + self.client = GravitinoClient(uri=server_uri, metalake_name=metalake_name) + self.cache: Dict[NameIdentifier, FilesetContext] = {} + self.cache_lock = rwlock.RWLockFair() + + super().__init__(**kwargs) + + def _strip_protocol(cls, path): + ops = utils.infer_storage_options(path) + path = ops["path"] + if path.startswith("//"): + # special case for "hdfs://path" (without the triple slash) + path = path[1:] + return path + + def ls(self, path, detail=True, **kwargs): + context: FilesetContext = self._get_fileset_context(path) + if context.actual_path.startswith(StorageType.HDFS.value): + from pyarrow.fs import FileSelector + + entries = [] + for entry in context.fs.get_file_info( + FileSelector(self._strip_protocol(context.actual_path)) + ): + entries.append( + self._convert_file_info_path_prefix( + entry, + context.fileset.storage_location(), + self._get_virtual_location(context.name_identifier, True), + ) + ) + return entries + else: + raise ValueError( + "Storage location: `{}` doesn't support now.".format( + context.actual_path + ) + ) + + def info(self, path, **kwargs): + print("This is `info` method") + NotImplementedError() + + def exists(self, path, **kwargs): + print("This is `exists` method") + NotImplementedError() + + def cp_file(self, path1, path2, **kwargs): + print("This is `cp_file` method") + NotImplementedError() + + def mv(self, path1, path2, **kwargs): + print("This is `mv` method") + NotImplementedError() + + def rm_file(self, path): + print("This is `rm_file` method") + NotImplementedError() + + def rm(self, path, recursive=False, **kwargs): + print("This is `rm` method") + NotImplementedError() + + def _open(self, path, mode="rb", block_size="default", **kwargs): + print("This is `_open` method") + NotImplementedError() + + def mkdir(self, path, create_parents=True, **kwargs): + print("This is `mkdir` method") + NotImplementedError() + + def makedirs(self, path, exist_ok=True): + print("This is `makedirs` method") + NotImplementedError() + + def rmdir(self, path): + print("This is `rmdir` method") + NotImplementedError() + + def modified(self, path): + print("This is `modified` method") + NotImplementedError() + + def cat_file(self, path, start=None, end=None, **kwargs): + print("This is `cat_file` method") + NotImplementedError() + + def get_file(self, rpath, lpath, **kwargs): + print("This is `get_file` method") + NotImplementedError() + + def _convert_file_info_path_prefix(self, info, storage_location, virtual_location): + from pyarrow.fs import FileType + + actual_prefix = self._strip_protocol(storage_location) + virtual_prefix = self._strip_protocol(virtual_location) + if not info.path.startswith(actual_prefix): + raise ValueError( + "Path {} does not start with prefix {}".format(info.path, actual_prefix) + ) + + if info.type is FileType.Directory: + kind = "directory" + elif info.type is FileType.File: + kind = "file" + elif info.type is FileType.NotFound: + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), info.path) + else: + kind = "other" + + return { + "name": "{}{}".format( + self._gvfs_prefix, info.path.replace(actual_prefix, virtual_prefix) + ), + "size": info.size, + "type": kind, + "mtime": info.mtime, + } + + def _get_fileset_context(self, virtual_path) -> FilesetContext: + identifier: NameIdentifier = self._extract_identifier(virtual_path) + read_lock = self.cache_lock.gen_rlock() + try: + read_lock.acquire() + context = self.cache.get(identifier) + if context is not None: + return context + finally: + read_lock.release() + + write_lock = self.cache_lock.gen_wlock() + try: + write_lock.acquire() + context = self.cache.get(identifier) + if context is not None: + return context + fileset: Fileset = self._load_fileset_from_server(identifier) + storage_location = fileset.storage_location() + if storage_location.startswith(StorageType.HDFS.value): + from pyarrow.fs import HadoopFileSystem + + fs = HadoopFileSystem.from_uri(storage_location) + actual_path = self._get_actual_path_by_ident( + identifier, fs, fileset, virtual_path + ) + context = FilesetContext(identifier, fileset, fs, actual_path) + self.cache[identifier] = context + return context + else: + raise ValueError( + "Storage location: `{}` doesn't contains valid identifier.".format( + storage_location + ) + ) + finally: + write_lock.release() + + def _extract_identifier(self, path) -> NameIdentifier: + if path is None or len(path) == 0: + raise ValueError("path which need be extracted cannot be null or empty.") + match = self._identifier_pattern.match(path) + if match and len(match.groups()) == 3: + return NameIdentifier.of_fileset( + self.metalake, + match.group(1), + match.group(2), + match.group(3), + ) + else: + raise ValueError( + "path: `{}` doesn't contains valid identifier.".format(path) + ) + + def _load_fileset_from_server(self, identifier: NameIdentifier) -> Fileset: + catalog: Catalog = self.client.load_catalog( + NameIdentifier.of_catalog( + identifier.namespace().level(0), identifier.namespace().level(1) + ) + ) + return catalog.as_fileset_catalog().load_fileset(identifier) + + def _get_actual_path_by_ident( + self, identifier: NameIdentifier, fs, fileset: Fileset, virtual_path + ) -> str: + with_scheme = virtual_path.startswith(self._gvfs_prefix) + virtual_location = self._get_virtual_location(identifier, with_scheme) + storage_location = fileset.storage_location() + try: + if self._check_mount_single_file(fileset, fs): + if virtual_path != virtual_location: + raise Exception( + "Path: {} should be same with the virtual location: {} when the fileset only mounts a single file.".format( + virtual_path, virtual_location + ) + ) + return storage_location + else: + return virtual_path.replace(virtual_location, storage_location, 1) + except Exception as e: + raise Exception( + "Cannot resolve path: {} to actual storage path, exception: {}".format( + virtual_path, str(e) + ) + ) + + def _get_virtual_location( + self, identifier: NameIdentifier, with_scheme: bool + ) -> str: + return "{}/{}/{}/{}".format( + self._gvfs_prefix if with_scheme is True else "", + identifier.namespace().level(1), + identifier.namespace().level(2), + identifier.name(), + ) + + def _check_mount_single_file(self, fileset: Fileset, fs: HadoopFileSystem) -> bool: + from pyarrow.fs import FileType + + try: + [info] = fs.get_file_info( + [(self._strip_protocol(fileset.storage_location()))] + ) + if info.type is FileType.File: + return True + else: + return False + except Exception as e: + raise Exception( + "Cannot check whether the fileset: {} mounts a single file, exception: {}".format( + fileset.storage_location(), str(e) + ) + ) + + +class HDFSFile(io.IOBase): + def __init__(self, fs, stream, path, mode, block_size=None, **kwargs): + self.path = path + self.mode = mode + + self.fs = fs + self.stream = stream + + self.blocksize = self.block_size = block_size + self.kwargs = kwargs + + def __enter__(self): + return self + + def __exit__(self, *args): + return self.close() + + +fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) diff --git a/clients/client-python/gravitino/name_identifier.py b/clients/client-python/gravitino/name_identifier.py index 5ce77640437..f894a859402 100644 --- a/clients/client-python/gravitino/name_identifier.py +++ b/clients/client-python/gravitino/name_identifier.py @@ -276,7 +276,7 @@ def __eq__(self, other): return self._namespace == other._namespace and self._name == other._name def __hash__(self): - return hash(self._namespace, self._name) + return hash((self._namespace, self._name)) def __str__(self): if self.has_namespace(): diff --git a/clients/client-python/requirements.txt b/clients/client-python/requirements.txt index e4e72675b81..08b9ca80529 100644 --- a/clients/client-python/requirements.txt +++ b/clients/client-python/requirements.txt @@ -3,4 +3,7 @@ # the tools to publish the python client to Pypi requests -dataclasses-json \ No newline at end of file +dataclasses-json +readerwriterlock +fsspec>=2024.3.1 +pyarrow>=15.0.2 \ No newline at end of file