Skip to content

Commit

Permalink
support gvfs
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed May 23, 2024
1 parent 506963e commit 2420ab8
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 2 deletions.
1 change: 1 addition & 0 deletions clients/client-python/gravitino/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
from gravitino.client.gravitino_admin_client import GravitinoAdminClient
from gravitino.client.gravitino_metalake import GravitinoMetalake
from gravitino.name_identifier import NameIdentifier
from gravitino.filesystem import gvfs
4 changes: 4 additions & 0 deletions clients/client-python/gravitino/filesystem/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
318 changes: 318 additions & 0 deletions clients/client-python/gravitino/filesystem/gvfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""

import errno
import os

import fsspec
import io
import regex

from enum import Enum
from fsspec import utils
from gravitino.api.catalog import Catalog
from gravitino.api.fileset import Fileset
from gravitino.client.gravitino_client import GravitinoClient
from gravitino.name_identifier import NameIdentifier
from pyarrow.fs import HadoopFileSystem
from readerwriterlock import rwlock
from typing import Dict

PROTOCOL_NAME = "gvfs"


class StorageType(Enum):
LOCAL = "file://"
HDFS = "hdfs://"


class FilesetContext:
def __init__(
self,
name_identifier: NameIdentifier,
fileset: Fileset,
fs,
actual_path,
):
self.name_identifier = name_identifier
self.fileset = fileset
self.fs = fs
self.actual_path = actual_path

def get_name_identifier(self):
return self.name_identifier

def get_fileset(self):
return self.fileset

def get_fs(self):
return self.fs

def get_actual_path(self):
return self.actual_path


class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
protocol = PROTOCOL_NAME
_gvfs_prefix = "gvfs://fileset"
_identifier_pattern = regex.compile(
"^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$"
)

def __init__(self, server_uri, metalake_name, auth_type, extra_conf=None, **kwargs):
self.pars = (server_uri, metalake_name, auth_type, extra_conf)
self.metalake = metalake_name
self.client = GravitinoClient(uri=server_uri, metalake_name=metalake_name)
self.cache: Dict[NameIdentifier, FilesetContext] = {}
self.cache_lock = rwlock.RWLockFair()

super().__init__(**kwargs)

def _strip_protocol(cls, path):
ops = utils.infer_storage_options(path)
path = ops["path"]
if path.startswith("//"):
# special case for "hdfs://path" (without the triple slash)
path = path[1:]
return path

def ls(self, path, detail=True, **kwargs):
context: FilesetContext = self._get_fileset_context(path)
if context.actual_path.startswith(StorageType.HDFS.value):
from pyarrow.fs import FileSelector

entries = []
for entry in context.fs.get_file_info(
FileSelector(self._strip_protocol(context.actual_path))
):
entries.append(
self._convert_file_info_path_prefix(
entry,
context.fileset.storage_location(),
self._get_virtual_location(context.name_identifier, True),
)
)
return entries
else:
raise ValueError(
"Storage location: `{}` doesn't support now.".format(
context.actual_path
)
)

def info(self, path, **kwargs):
print("This is `info` method")
NotImplementedError()

def exists(self, path, **kwargs):
print("This is `exists` method")
NotImplementedError()

def cp_file(self, path1, path2, **kwargs):
print("This is `cp_file` method")
NotImplementedError()

def mv(self, path1, path2, **kwargs):
print("This is `mv` method")
NotImplementedError()

def rm_file(self, path):
print("This is `rm_file` method")
NotImplementedError()

def rm(self, path, recursive=False, **kwargs):
print("This is `rm` method")
NotImplementedError()

def _open(self, path, mode="rb", block_size="default", **kwargs):
print("This is `_open` method")
NotImplementedError()

def mkdir(self, path, create_parents=True, **kwargs):
print("This is `mkdir` method")
NotImplementedError()

def makedirs(self, path, exist_ok=True):
print("This is `makedirs` method")
NotImplementedError()

def rmdir(self, path):
print("This is `rmdir` method")
NotImplementedError()

def modified(self, path):
print("This is `modified` method")
NotImplementedError()

def cat_file(self, path, start=None, end=None, **kwargs):
print("This is `cat_file` method")
NotImplementedError()

def get_file(self, rpath, lpath, **kwargs):
print("This is `get_file` method")
NotImplementedError()

def _convert_file_info_path_prefix(self, info, storage_location, virtual_location):
from pyarrow.fs import FileType

actual_prefix = self._strip_protocol(storage_location)
virtual_prefix = self._strip_protocol(virtual_location)
if not info.path.startswith(actual_prefix):
raise ValueError(
"Path {} does not start with prefix {}".format(info.path, actual_prefix)
)

if info.type is FileType.Directory:
kind = "directory"
elif info.type is FileType.File:
kind = "file"
elif info.type is FileType.NotFound:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), info.path)
else:
kind = "other"

return {
"name": "{}{}".format(
self._gvfs_prefix, info.path.replace(actual_prefix, virtual_prefix)
),
"size": info.size,
"type": kind,
"mtime": info.mtime,
}

def _get_fileset_context(self, virtual_path) -> FilesetContext:
identifier: NameIdentifier = self._extract_identifier(virtual_path)
read_lock = self.cache_lock.gen_rlock()
try:
read_lock.acquire()
context = self.cache.get(identifier)
if context is not None:
return context
finally:
read_lock.release()

write_lock = self.cache_lock.gen_wlock()
try:
write_lock.acquire()
context = self.cache.get(identifier)
if context is not None:
return context
fileset: Fileset = self._load_fileset_from_server(identifier)
storage_location = fileset.storage_location()
if storage_location.startswith(StorageType.HDFS.value):
from pyarrow.fs import HadoopFileSystem

fs = HadoopFileSystem.from_uri(storage_location)
actual_path = self._get_actual_path_by_ident(
identifier, fs, fileset, virtual_path
)
context = FilesetContext(identifier, fileset, fs, actual_path)
self.cache[identifier] = context
return context
else:
raise ValueError(
"Storage location: `{}` doesn't contains valid identifier.".format(
storage_location
)
)
finally:
write_lock.release()

def _extract_identifier(self, path) -> NameIdentifier:
if path is None or len(path) == 0:
raise ValueError("path which need be extracted cannot be null or empty.")
match = self._identifier_pattern.match(path)
if match and len(match.groups()) == 3:
return NameIdentifier.of_fileset(
self.metalake,
match.group(1),
match.group(2),
match.group(3),
)
else:
raise ValueError(
"path: `{}` doesn't contains valid identifier.".format(path)
)

def _load_fileset_from_server(self, identifier: NameIdentifier) -> Fileset:
catalog: Catalog = self.client.load_catalog(
NameIdentifier.of_catalog(
identifier.namespace().level(0), identifier.namespace().level(1)
)
)
return catalog.as_fileset_catalog().load_fileset(identifier)

def _get_actual_path_by_ident(
self, identifier: NameIdentifier, fs, fileset: Fileset, virtual_path
) -> str:
with_scheme = virtual_path.startswith(self._gvfs_prefix)
virtual_location = self._get_virtual_location(identifier, with_scheme)
storage_location = fileset.storage_location()
try:
if self._check_mount_single_file(fileset, fs):
if virtual_path != virtual_location:
raise Exception(
"Path: {} should be same with the virtual location: {} when the fileset only mounts a single file.".format(
virtual_path, virtual_location
)
)
return storage_location
else:
return virtual_path.replace(virtual_location, storage_location, 1)
except Exception as e:
raise Exception(
"Cannot resolve path: {} to actual storage path, exception: {}".format(
virtual_path, str(e)
)
)

def _get_virtual_location(
self, identifier: NameIdentifier, with_scheme: bool
) -> str:
return "{}/{}/{}/{}".format(
self._gvfs_prefix if with_scheme is True else "",
identifier.namespace().level(1),
identifier.namespace().level(2),
identifier.name(),
)

def _check_mount_single_file(self, fileset: Fileset, fs: HadoopFileSystem) -> bool:
from pyarrow.fs import FileType

try:
[info] = fs.get_file_info(
[(self._strip_protocol(fileset.storage_location()))]
)
if info.type is FileType.File:
return True
else:
return False
except Exception as e:
raise Exception(
"Cannot check whether the fileset: {} mounts a single file, exception: {}".format(
fileset.storage_location(), str(e)
)
)


class HDFSFile(io.IOBase):
def __init__(self, fs, stream, path, mode, block_size=None, **kwargs):
self.path = path
self.mode = mode

self.fs = fs
self.stream = stream

self.blocksize = self.block_size = block_size
self.kwargs = kwargs

def __enter__(self):
return self

def __exit__(self, *args):
return self.close()


fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
2 changes: 1 addition & 1 deletion clients/client-python/gravitino/name_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def __eq__(self, other):
return self._namespace == other._namespace and self._name == other._name

def __hash__(self):
return hash(self._namespace, self._name)
return hash((self._namespace, self._name))

def __str__(self):
if self.has_namespace():
Expand Down
5 changes: 4 additions & 1 deletion clients/client-python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@

# the tools to publish the python client to Pypi
requests
dataclasses-json
dataclasses-json
readerwriterlock
fsspec>=2024.3.1
pyarrow>=15.0.2

0 comments on commit 2420ab8

Please sign in to comment.