Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add git to filesystem source 301 #954

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ def protocol(self) -> str:

def on_resolved(self) -> None:
url = urlparse(self.bucket_url)
if not url.path and not url.netloc:
raise ConfigurationValueError(
"File path or netloc missing. Field bucket_url of FilesystemClientConfiguration"
" must contain valid url with a path or host:password component."
)
if url.scheme not in ("gitpythonfs", "github", "git"):
if not url.path and not url.netloc:
raise ConfigurationValueError(
"File path or netloc missing. Field bucket_url of FilesystemClientConfiguration"
" must contain valid url with a path or host:password component."
)
# this is just a path in a local file system
if url.path == self.bucket_url:
url = url._replace(scheme="file")
Expand Down
124 changes: 96 additions & 28 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
)
from urllib.parse import urlparse

from fsspec import AbstractFileSystem, register_implementation
from fsspec.registry import known_implementations, register_implementation
from fsspec import AbstractFileSystem
from fsspec.core import url_to_fs

from dlt import version
Expand Down Expand Up @@ -48,21 +49,20 @@ class FileItem(TypedDict, total=False):
file_content: Optional[bytes]


# Map of protocol to mtime resolver
# we only need to support a small finite set of protocols
MTIME_DISPATCH = {
"s3": lambda f: ensure_pendulum_datetime(f["LastModified"]),
"adl": lambda f: ensure_pendulum_datetime(f["LastModified"]),
"az": lambda f: ensure_pendulum_datetime(f["last_modified"]),
"gcs": lambda f: ensure_pendulum_datetime(f["updated"]),
"file": lambda f: ensure_pendulum_datetime(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime(f["created"]),
"gdrive": lambda f: ensure_pendulum_datetime(f["modifiedTime"]),
DEFAULT_MTIME_FIELD_NAME = "mtime"
MTIME_FIELD_NAMES = {
"file": "mtime",
"s3": "LastModified",
"adl": "LastModified",
"az": "last_modified",
"gcs": "updated",
"memory": "created",
"gdrive": "modifiedTime",
}
# Support aliases
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"]
MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"]
MTIME_FIELD_NAMES["gs"] = MTIME_FIELD_NAMES["gcs"]
MTIME_FIELD_NAMES["s3a"] = MTIME_FIELD_NAMES["s3"]
MTIME_FIELD_NAMES["abfs"] = MTIME_FIELD_NAMES["az"]

# Map of protocol to a filesystem type
CREDENTIALS_DISPATCH: Dict[str, Callable[[FilesystemConfiguration], DictStrAny]] = {
Expand All @@ -76,6 +76,52 @@ class FileItem(TypedDict, total=False):
"azure": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
}

CUSTOM_IMPLEMENTATIONS = {
"dummyfs": {
deanja marked this conversation as resolved.
Show resolved Hide resolved
"fq_classname": "dummyfs.DummyFileSystem",
"errtxt": "Dummy only",
},
"gdrive": {
"fq_classname": "dlt.common.storages.fsspecs.google_drive.GoogleDriveFileSystem",
"errtxt": "Please install gdrivefs to access GoogleDriveFileSystem",
},
"gitpythonfs": {
"fq_classname": "dlt.common.storages.fsspecs.gitpythonfs.GitPythonFileSystem",
"errtxt": "Please install gitpythonfs to access GitPythonFileSystem",
},
}


def register_implementation_in_fsspec(protocol: str) -> None:
"""Dynamically register a filesystem implementation with fsspec.

This is useful if the implementation is not officially known in the fsspec codebase.

The registration's scope is the current process.

Is a no-op if an implementation is already registerd for the given protocol.

Args:
protocol (str): The protocol to register.

Returns: None
"""
if protocol in known_implementations:
return

if protocol not in CUSTOM_IMPLEMENTATIONS:
raise ValueError(
f"Unknown protocol: '{protocol}' is not an fsspec known "
"implementations nor a dlt custom implementations."
)

registration_details = CUSTOM_IMPLEMENTATIONS[protocol]
register_implementation(
protocol,
registration_details["fq_classname"],
errtxt=registration_details["errtxt"],
)


def fsspec_filesystem(
protocol: str,
Expand Down Expand Up @@ -112,11 +158,6 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny:
fs_kwargs: DictStrAny = {"use_listings_cache": False, "listings_expiry_time": 60.0}
credentials = CREDENTIALS_DISPATCH.get(protocol, lambda _: {})(config)

if protocol == "gdrive":
from dlt.common.storages.fsspecs.google_drive import GoogleDriveFileSystem

register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem")

if config.kwargs is not None:
fs_kwargs.update(config.kwargs)
if config.client_kwargs is not None:
Expand All @@ -142,6 +183,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
Returns: (fsspec filesystem, normalized url)
"""
fs_kwargs = prepare_fsspec_args(config)
register_implementation_in_fsspec(config.protocol)
deanja marked this conversation as resolved.
Show resolved Hide resolved

try:
return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore
Expand All @@ -157,29 +199,31 @@ class FileItemDict(DictStrAny):
def __init__(
self,
mapping: FileItem,
credentials: Optional[Union[FileSystemCredentials, AbstractFileSystem]] = None,
fs_details: Optional[Union[AbstractFileSystem, FilesystemConfiguration, FileSystemCredentials]] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have simply added FilesystemConfiguration as an additional type, and tidied up the naming. Is that far enough for this PR?

Possible fast-follower to this PR:

  1. Limit the types that can be passed to FileItemDict constructor. I get the feeling this is an important internal API. The following calls are currently in use:
  • AbstractFileSystem, in tests/common/storages/test_fsspec_filesystem.py
  • FileSystemCredentials, in tests/common/storages/utils.py
  • None, in verified-sources sources/inbox/init.py
    Could there be other usage in dlt example projects?

):
"""Create a dictionary with the filesystem client.

Args:
mapping (FileItem): The file item TypedDict.
credentials (Optional[FileSystemCredentials], optional): The credentials to the
fs_details (Optional[AbstractFileSystem, FilesystemConfiguration, FileSystemCredentials], optional): Details to help get a
filesystem. Defaults to None.
"""
self.credentials = credentials
self.fs_details = fs_details
super().__init__(**mapping)

@property
def fsspec(self) -> AbstractFileSystem:
"""The filesystem client is based on the given credentials.
"""The filesystem client is based on the given details.

Returns:
AbstractFileSystem: The fsspec client.
AbstractFileSystem: An fsspec client.
"""
if isinstance(self.credentials, AbstractFileSystem):
return self.credentials
if isinstance(self.fs_details, AbstractFileSystem):
return self.fs_details
elif isinstance(self.fs_details, FilesystemConfiguration):
return fsspec_from_config(self.fs_details)[0]
else:
return fsspec_filesystem(self["file_url"], self.credentials)[0]
return fsspec_filesystem(self["file_url"], self.fs_details)[0]

def open( # noqa: A003
self,
Expand Down Expand Up @@ -261,6 +305,28 @@ def guess_mime_type(file_name: str) -> Sequence[str]:
return type_


def extract_mtime(file_metadata: Dict[str, Any], protocol: str = None) -> pendulum.DateTime:
"""Extract the modification time from file listing metadata.

If a protocol is not provided, None or not a known protocol,
then default field name `mtime` is tried. If there's no `mtime` field,
the current time is returned.

`mtime` is used for the "file" fsspec implementation and our custom fsspec implementations.
`mtime` is common terminology in unix-like systems.

Args:
file_metadata (Dict[str, Any]): The file metadata.
protocol (str) [Optional]: The protocol.

Returns:
pendulum.DateTime: The latest modification time. Defaults to `now()` if no suitable
field is found in the metadata.
"""
field_name = MTIME_FIELD_NAMES.get(protocol, DEFAULT_MTIME_FIELD_NAME)
return ensure_pendulum_datetime(file_metadata.get(field_name, pendulum.now()))


def glob_files(
fs_client: AbstractFileSystem, bucket_url: str, file_glob: str = "**"
) -> Iterator[FileItem]:
Expand Down Expand Up @@ -307,12 +373,14 @@ def glob_files(
path=posixpath.join(bucket_url_parsed.path, file_name)
).geturl()

modification_date = extract_mtime(md, bucket_url_parsed.scheme)

mime_type, encoding = guess_mime_type(file_name)
yield FileItem(
file_name=file_name,
file_url=file_url,
mime_type=mime_type,
encoding=encoding,
modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md),
modification_date=modification_date,
size_in_bytes=int(md["size"]),
)
Loading
Loading