Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lfs: drop dvc objects dependency #320

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ dependencies = [
"dulwich>=0.21.6",
"pygit2>=1.14.0",
"pygtrie>=2.3.2",
"fsspec>=2024.2.0",
"fsspec[tqdm]>=2024.2.0",
"pathspec>=0.9.0",
"asyncssh>=2.13.1,<3",
"funcy>=1.14",
"shortuuid>=0.5.0",
"dvc-objects>=4,<5",
"aiohttp-retry>=2.5.0",
"tqdm",
]

[project.urls]
Expand All @@ -54,6 +53,7 @@ tests = [
"types-certifi==2021.10.8.3",
"types-mock==5.1.0.2",
"types-paramiko==3.4.0.20240120",
"types-tqdm",
]
dev = [
"scmrepo[tests]",
Expand Down
36 changes: 27 additions & 9 deletions src/scmrepo/git/lfs/client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
from collections.abc import Iterable
from contextlib import AbstractContextManager
from multiprocessing import cpu_count
import os
import shutil
from collections.abc import Iterable, Iterator
from contextlib import AbstractContextManager, contextmanager, suppress
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any, Optional

import aiohttp
from aiohttp_retry import ExponentialRetry, RetryClient
from dvc_objects.fs import localfs
from dvc_objects.fs.utils import as_atomic
from fsspec.asyn import _run_coros_in_chunks, sync_wrapper
from fsspec.callbacks import DEFAULT_CALLBACK
from fsspec.implementations.http import HTTPFileSystem
Expand All @@ -31,7 +31,6 @@ class LFSClient(AbstractContextManager):

JSON_CONTENT_TYPE = "application/vnd.git-lfs+json"

_JOBS = 4 * cpu_count()
_REQUEST_TIMEOUT = 60
_SESSION_RETRIES = 5
_SESSION_BACKOFF_FACTOR = 0.1
Expand Down Expand Up @@ -153,14 +152,15 @@ async def _download(
storage: "LFSStorage",
objects: Iterable[Pointer],
callback: "Callback" = DEFAULT_CALLBACK,
batch_size: Optional[int] = None,
**kwargs,
):
async def _get_one(from_path: str, to_path: str, **kwargs):
with as_atomic(localfs, to_path, create_parents=True) as tmp_file:
with _as_atomic(to_path, create_parents=True) as tmp_file:
with callback.branched(from_path, tmp_file) as child:
await self._fs._get_file(
from_path, tmp_file, callback=child, **kwargs
) # pylint: disable=protected-access
)
callback.relative_update()

resp_data = await self._batch_request(objects, **kwargs)
Expand All @@ -178,9 +178,27 @@ async def _get_one(from_path: str, to_path: str, **kwargs):
to_path = storage.oid_to_path(obj.oid)
coros.append(_get_one(url, to_path, headers=headers))
for result in await _run_coros_in_chunks(
coros, batch_size=self._JOBS, return_exceptions=True
coros, batch_size=batch_size, return_exceptions=True
):
if isinstance(result, BaseException):
raise result

download = sync_wrapper(_download)


@contextmanager
def _as_atomic(to_info: str, create_parents: bool = False) -> Iterator[str]:
parent = os.path.dirname(to_info)
if create_parents:
os.makedirs(parent, exist_ok=True)

tmp_file = NamedTemporaryFile(dir=parent, delete=False)
tmp_file.close()
try:
yield tmp_file.name
except BaseException:
with suppress(FileNotFoundError):
os.unlink(tmp_file.name)
raise
else:
shutil.move(tmp_file.name, to_info)
115 changes: 111 additions & 4 deletions src/scmrepo/git/lfs/progress.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,114 @@
from typing import BinaryIO, Callable, Optional, Union
import logging
import sys
from typing import Any, BinaryIO, Callable, ClassVar, Optional, Union

from dvc_objects.fs.callbacks import TqdmCallback
from fsspec.callbacks import DEFAULT_CALLBACK, Callback
from fsspec.callbacks import DEFAULT_CALLBACK, Callback, TqdmCallback
from tqdm import tqdm

from scmrepo.progress import GitProgressEvent


class _Tqdm(tqdm):
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmrowla, do you think it's possible to use dvc's TqdmCallback and Tqdm implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible, but I do think there's value in scmrepo being able to handle reporting progress entirely separately from DVC

This _Tqdm subclass isn't strictly necessary in scmrepo, but the default fsspec TqdmCallback and default tqdm progressbars don't handle the branched nesting particularly well.

Copy link
Member

@skshetry skshetry Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the default fsspec TqdmCallback and default tqdm progressbars don't handle the branched nesting particularly well.

This is only a UI thing, right? i.e. nothing breaks?

Can we implement something like this in scmrepo?

class LFSCallback:
   def branched():
       return DEFAULT_CALLBACK

which can be extended in dvc to return TqdmCallback which can be passed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that should work fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmrowla, is it just the lfs_fetch/fetch that should take progressbar from dvc? I see that the function is being called in many different places like open() and smudge/lfsfilter indirectly.

I guess there's no way to pass the callbacks to them, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From DVC's perspective it only matters for the high level fetch. Fetch can still be done from open/smudge because that's how the LFS CLI works.

You can manually do an explicit git lfs fetch to fetch LFS objects (which is the equivalent of what DVC does), and progress for the fetch will get reported similar to how git fetch progress is normally reported. But if you are doing a git checkout and there is an LFS object that has not been fetched that needs to be checked out, git + LFS will do the underlying LFS fetch for that object at that time, but it will just look like a slow/hanging checkout.
(You can configure LFS to write progress to a specific file instead of stderr/stdout for these situations where there is no equivalent CLI git progress reporting, but that's not implemented in scmrepo)

maximum-compatibility tqdm-based progressbars
"""

BAR_FMT_DEFAULT = (
"{percentage:3.0f}% {desc}|{bar}|"
"{postfix[info]}{n_fmt}/{total_fmt}"
" [{elapsed}<{remaining}, {rate_fmt:>11}]"
)
# nested bars should have fixed bar widths to align nicely
BAR_FMT_DEFAULT_NESTED = (
"{percentage:3.0f}%|{bar:10}|{desc:{ncols_desc}.{ncols_desc}}"
"{postfix[info]}{n_fmt}/{total_fmt}"
" [{elapsed}<{remaining}, {rate_fmt:>11}]"
)
BAR_FMT_NOTOTAL = "{desc}{bar:b}|{postfix[info]}{n_fmt} [{elapsed}, {rate_fmt:>11}]"
BYTES_DEFAULTS: ClassVar[dict[str, Any]] = {
"unit": "B",
"unit_scale": True,
"unit_divisor": 1024,
"miniters": 1,
}

def __init__( # noqa: PLR0913
self,
iterable=None,
disable=None,
level=logging.ERROR,
desc=None,
leave=False,
bar_format=None,
bytes=False, # noqa: A002
file=None,
total=None,
postfix=None,
**kwargs,
):
kwargs = kwargs.copy()
if bytes:
kwargs = {**self.BYTES_DEFAULTS, **kwargs}
else:
kwargs.setdefault("unit_scale", total > 999 if total else True)
if file is None:
file = sys.stderr
super().__init__(
iterable=iterable,
disable=disable,
leave=leave,
desc=desc,
bar_format="!",
lock_args=(False,),
total=total,
**kwargs,
)
self.postfix = postfix or {"info": ""}
if bar_format is None:
if self.__len__():
self.bar_format = (
self.BAR_FMT_DEFAULT_NESTED if self.pos else self.BAR_FMT_DEFAULT
)
else:
self.bar_format = self.BAR_FMT_NOTOTAL
else:
self.bar_format = bar_format
self.refresh()

def update_to(self, current, total=None):
if total:
self.total = total
self.update(current - self.n)

def close(self):
self.postfix["info"] = ""
# remove ETA (either unknown or zero); remove completed bar
self.bar_format = self.bar_format.replace("<{remaining}", "").replace(
"|{bar:10}|", " "
)
super().close()

@property
def format_dict(self):
"""inject `ncols_desc` to fill the display width (`ncols`)"""
d = super().format_dict
ncols = d["ncols"] or 80
# assumes `bar_format` has max one of ("ncols_desc" & "ncols_info")

meter = self.format_meter( # type: ignore[call-arg]
ncols_desc=1, ncols_info=1, **d
)
ncols_left = ncols - len(meter) + 1
ncols_left = max(ncols_left, 0)
if ncols_left:
d["ncols_desc"] = d["ncols_info"] = ncols_left
else:
# work-around for zero-width description
d["ncols_desc"] = d["ncols_info"] = 1
d["prefix"] = ""
return d


class LFSCallback(Callback):
"""Callback subclass to generate Git/LFS style progress."""

Expand Down Expand Up @@ -37,7 +140,11 @@ def _update_git(self):
def branched(self, path_1: Union[str, BinaryIO], path_2: str, **kwargs):
if self.git_progress:
return TqdmCallback(
bytes=True, desc=path_1 if isinstance(path_1, str) else path_2
tqdm_kwargs={
"desc": path_1 if isinstance(path_1, str) else path_2,
"bytes": True,
},
tqdm_cls=_Tqdm,
)
return DEFAULT_CALLBACK

Expand Down
5 changes: 4 additions & 1 deletion src/scmrepo/git/lfs/smudge.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@


def smudge(
storage: "LFSStorage", fobj: BinaryIO, url: Optional[str] = None
storage: "LFSStorage",
fobj: BinaryIO,
url: Optional[str] = None,
batch_size: Optional[int] = None,
) -> BinaryIO:
"""Wrap the specified binary IO stream and run LFS smudge if necessary."""
reader = io.BufferedReader(fobj) # type: ignore[arg-type]
Expand Down
6 changes: 4 additions & 2 deletions src/scmrepo/git/lfs/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ def fetch(
url: str,
objects: Collection[Pointer],
progress: Optional[Callable[["GitProgressEvent"], None]] = None,
batch_size: Optional[int] = None,
):
from .client import LFSClient

with LFSCallback.as_lfs_callback(progress) as cb:
cb.set_size(len(objects))
with LFSClient.from_git_url(url) as client:
client.download(self, objects, callback=cb)
client.download(self, objects, callback=cb, batch_size=batch_size)

def oid_to_path(self, oid: str):
return os.path.join(self.path, "objects", oid[0:2], oid[2:4], oid)
Expand All @@ -40,6 +41,7 @@ def open(
self,
obj: Union[Pointer, str],
fetch_url: Optional[str] = None,
batch_size: Optional[int] = None,
**kwargs,
) -> BinaryIO:
oid = obj if isinstance(obj, str) else obj.oid
Expand All @@ -50,7 +52,7 @@ def open(
if not fetch_url or not isinstance(obj, Pointer):
raise
try:
self.fetch(fetch_url, [obj])
self.fetch(fetch_url, [obj], batch_size=batch_size)
except BaseException as exc: # noqa: BLE001
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), path
Expand Down
Loading