From 88660f6a66e7d58b051a7624145a1c385f27250f Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Mon, 15 Nov 2021 03:24:42 +0200 Subject: [PATCH] dvc: use transfer in checkout/doctor/odb All three of those use more or less the same logic for testing different link types and maybe then using them. So now instead of having 3 implementations, we'll have just one. Filesystem `transfer` now will not try to catch particular errno's, but instead will just ignore any OSError and move down to the next link type to try out, and if the last one fails - it will raise the last exception, so if it is a common problem - we'll be able to see it. This patch also removes `odb.confirmed_cache_type`, as it was caching link type that is not really a cache-level thing, but rather a checkout thing, that should only be cached for particular objects we are linking, as multiple objects could be linked to different filesystems from the same odb (e.g. external local output on hdd and local data on your ssd). Next step here will be to introduce metadata diff, so that we can get rid of temporary Link wrapper and introduce Transfer and corresponding transfer config, that will know what links to use and could have some other things like callbacks, so we don't continue passing dozens of kwargs along the stack. Continuing from #6963 Fixes #6980 --- dvc/fs/local.py | 5 +- dvc/fs/utils.py | 157 +++++++++++++----- dvc/info.py | 44 ++--- dvc/objects/checkout.py | 124 ++++---------- dvc/objects/db/base.py | 1 - dvc/objects/db/slow_link_detection.py | 8 +- tests/func/test_add.py | 6 +- tests/unit/remote/test_slow_link_detection.py | 8 +- 8 files changed, 175 insertions(+), 178 deletions(-) diff --git a/dvc/fs/local.py b/dvc/fs/local.py index 2e191fb0cc..51bdfc0b54 100644 --- a/dvc/fs/local.py +++ b/dvc/fs/local.py @@ -153,8 +153,9 @@ def info(self, path): def put_file( self, from_file, to_info, callback=DEFAULT_CALLBACK, **kwargs ): - makedirs(self.path.parent(to_info), exist_ok=True) - tmp_file = tmp_fname(to_info) + parent = self.path.parent(to_info) + makedirs(parent, exist_ok=True) + tmp_file = self.path.join(parent, tmp_fname()) copyfile(from_file, tmp_file, callback=callback) os.replace(tmp_file, to_info) diff --git a/dvc/fs/utils.py b/dvc/fs/utils.py index 1ddf8099e0..d408b83803 100644 --- a/dvc/fs/utils.py +++ b/dvc/fs/utils.py @@ -1,75 +1,99 @@ import errno import logging import os -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List, Optional from .base import RemoteActionNotImplemented from .local import LocalFileSystem if TYPE_CHECKING: - from dvc.types import AnyPath - from .base import BaseFileSystem + from .types import AnyPath logger = logging.getLogger(__name__) def _link( + link: "AnyPath", from_fs: "BaseFileSystem", - from_info: "AnyPath", + from_path: "AnyPath", to_fs: "BaseFileSystem", - to_info: "AnyPath", - hardlink: bool = False, + to_path: "AnyPath", ) -> None: if not isinstance(from_fs, type(to_fs)): raise OSError(errno.EXDEV, "can't link across filesystems") - links = ["reflink"] + ["hardlink"] if hardlink else [] + try: + func = getattr(to_fs, link) + func(from_path, to_path) + except (OSError, AttributeError, RemoteActionNotImplemented) as exc: + # NOTE: there are so many potential error codes that a link call can + # throw, that is safer to just catch them all and try another link. + raise OSError( + errno.ENOTSUP, f"'{link}' is not supported by {type(from_fs)}" + ) from exc + + +def _copy( + from_fs: "BaseFileSystem", + from_path: "AnyPath", + to_fs: "BaseFileSystem", + to_path: "AnyPath", +) -> None: + if isinstance(from_fs, LocalFileSystem): + return to_fs.upload(from_path, to_path) + + if isinstance(to_fs, LocalFileSystem): + return from_fs.download_file(from_path, to_path) + + with from_fs.open(from_path, mode="rb") as fobj: + size = from_fs.getsize(from_path) + return to_fs.upload(fobj, to_path, total=size) + + +def _try_links( + links: List["AnyPath"], + from_fs: "BaseFileSystem", + from_path: "AnyPath", + to_fs: "BaseFileSystem", + to_path: "AnyPath", +) -> None: + error = None while links: - link = links.pop(0) - try: - func = getattr(to_fs, link) - except AttributeError: - continue + link = links[0] + + if link == "copy": + return _copy(from_fs, from_path, to_fs, to_path) try: - return func(from_info, to_info) - except RemoteActionNotImplemented: - continue + return _link(link, from_fs, from_path, to_fs, to_path) except OSError as exc: - if exc.errno not in [ - errno.EXDEV, - errno.ENOTSUP, - errno.ENOSYS, # https://github.com/iterative/dvc/issues/6962 - ]: + if exc.errno not in [errno.ENOTSUP, errno.EXDEV]: raise + error = exc + + del links[0] - raise OSError(errno.ENOTSUP, "reflink and hardlink are not supported") + raise OSError( + errno.ENOTSUP, "no more link types left to try out" + ) from error def transfer( from_fs: "BaseFileSystem", - from_info: "AnyPath", + from_path: "AnyPath", to_fs: "BaseFileSystem", - to_info: "AnyPath", + to_path: "AnyPath", hardlink: bool = False, + links: Optional[List["AnyPath"]] = None, ) -> None: try: - try: - return _link(from_fs, from_info, to_fs, to_info, hardlink=hardlink) - except OSError as exc: - if exc.errno not in [errno.EXDEV, errno.ENOTSUP]: - raise - - if isinstance(from_fs, LocalFileSystem): - return to_fs.upload(from_info, to_info) - - if isinstance(to_fs, LocalFileSystem): - return from_fs.download_file(from_info, to_info) - - with from_fs.open(from_info, mode="rb") as fobj: - size = from_fs.getsize(from_info) - return to_fs.upload(fobj, to_info, total=size) + assert not (hardlink and links) + if hardlink: + links = links or ["reflink", "hardlink", "copy"] + else: + links = links or ["reflink", "copy"] + _try_links(links, from_fs, from_path, to_fs, to_path) except OSError as exc: # If the target file already exists, we are going to simply # ignore the exception (#4992). @@ -83,7 +107,62 @@ def transfer( and exc.__context__ and isinstance(exc.__context__, FileExistsError) ): - logger.debug("'%s' file already exists, skipping", to_info) + logger.debug("'%s' file already exists, skipping", to_path) return None raise + + +def _test_link( + link: "AnyPath", + from_fs: "BaseFileSystem", + from_file: "AnyPath", + to_fs: "BaseFileSystem", + to_file: "AnyPath", +) -> bool: + try: + _try_links([link], from_fs, from_file, to_fs, to_file) + except OSError: + logger.debug("", exc_info=True) + return False + + try: + _is_link_func = getattr(to_fs, f"is_{link}") + return _is_link_func(to_file) + except AttributeError: + pass + + return True + + +def test_links( + links: List["AnyPath"], + from_fs: "BaseFileSystem", + from_path: "AnyPath", + to_fs: "BaseFileSystem", + to_path: "AnyPath", +) -> List["AnyPath"]: + from dvc.utils import tmp_fname + + from_file = from_fs.path.join(from_path, tmp_fname()) + to_file = to_fs.path.join( + to_fs.path.parent(to_path), + tmp_fname(), + ) + + from_fs.makedirs(from_fs.path.parent(from_file)) + with from_fs.open(from_file, "wb") as fobj: + fobj.write(b"test") + + ret = [] + try: + for link in links: + try: + if _test_link(link, from_fs, from_file, to_fs, to_file): + ret.append(link) + finally: + to_fs.remove(to_file) + finally: + from_fs.remove(from_file) + + return ret diff --git a/dvc/info.py b/dvc/info.py index 736b665f7d..4455d0afea 100644 --- a/dvc/info.py +++ b/dvc/info.py @@ -1,18 +1,16 @@ -import errno import itertools import os import pathlib import platform -import uuid import psutil from dvc import __version__ from dvc.exceptions import NotDvcRepoError from dvc.fs import FS_MAP, get_fs_cls, get_fs_config +from dvc.fs.utils import test_links from dvc.repo import Repo from dvc.scm.base import SCMError -from dvc.system import System from dvc.utils import error_link from dvc.utils.pkg import PKG @@ -83,37 +81,17 @@ def _get_remotes(config): def _get_linktype_support_info(repo): + odb = repo.odb.local + + links = test_links( + ["reflink", "hardlink", "symlink"], + odb.fs, + odb.fs_path, + repo.fs, + repo.root_dir, + ) - links = { - "reflink": (System.reflink, None), - "hardlink": (System.hardlink, System.is_hardlink), - "symlink": (System.symlink, System.is_symlink), - } - - fname = "." + str(uuid.uuid4()) - src = os.path.join(repo.odb.local.cache_dir, fname) - open(src, "w", encoding="utf-8").close() - dst = os.path.join(repo.root_dir, fname) - - cache = [] - - for name, (link, is_link) in links.items(): - try: - link(src, dst) - status = "supported" - if is_link and not is_link(dst): - status = "broken" - os.unlink(dst) - except OSError as exc: - if exc.errno != errno.ENOTSUP: - raise - status = "not supported" - - if status == "supported": - cache.append(name) - os.remove(src) - - return ", ".join(cache) + return ", ".join(links) def _get_supported_remotes(): diff --git a/dvc/objects/checkout.py b/dvc/objects/checkout.py index bd737c0994..6c700eede5 100644 --- a/dvc/objects/checkout.py +++ b/dvc/objects/checkout.py @@ -1,16 +1,9 @@ -import errno import logging from itertools import chain -from shortuuid import uuid - from dvc import prompt -from dvc.exceptions import ( - CacheLinkError, - CheckoutError, - ConfirmRemoveError, - DvcException, -) +from dvc.exceptions import CacheLinkError, CheckoutError, ConfirmRemoveError +from dvc.fs.utils import test_links, transfer from dvc.ignore import DvcIgnoreFilter from dvc.objects.db.slow_link_detection import ( # type: ignore[attr-defined] slow_link_guard, @@ -43,88 +36,9 @@ def _remove(fs_path, fs, in_cache, force=False): fs.remove(fs_path) -def _verify_link(cache, fs_path, link_type): - if link_type == "hardlink" and cache.fs.getsize(fs_path) == 0: - return - - if cache.cache_type_confirmed: - return - - is_link = getattr(cache.fs, f"is_{link_type}", None) - if is_link and not is_link(fs_path): - cache.fs.remove(fs_path) - raise DvcException(f"failed to verify {link_type}") - - cache.cache_type_confirmed = True - - -def _do_link(cache, from_info, to_info, link_method): - if cache.fs.exists(to_info): - cache.fs.remove(to_info) # broken symlink - - link_method(from_info, to_info) - - logger.debug( - "Created '%s': %s -> %s", cache.cache_types[0], from_info, to_info - ) - - -@slow_link_guard -def _try_links(cache, from_info, to_info, link_types): - while link_types: - link_method = getattr(cache.fs, link_types[0]) - try: - _do_link(cache, from_info, to_info, link_method) - _verify_link(cache, to_info, link_types[0]) - return - - except OSError as exc: - if exc.errno not in [errno.EXDEV, errno.ENOTSUP, errno.ENOSYS]: - raise - logger.debug( - "Cache type '%s' is not supported: %s", link_types[0], exc - ) - del link_types[0] - - raise CacheLinkError([to_info]) - - -def _link(cache, from_info, to_info): - cache.makedirs(cache.fs.path.parent(to_info)) - try: - _try_links(cache, from_info, to_info, cache.cache_types) - except FileNotFoundError as exc: - raise CheckoutError([str(to_info)]) from exc - - -def _confirm_cache_type(cache, fs_path): - """Checks whether cache uses copies.""" - if cache.cache_type_confirmed: - return - - if set(cache.cache_types) <= {"copy"}: - return - - workspace_file = cache.fs.path.with_name(fs_path, "." + uuid()) - test_cache_file = cache.fs.path.join( - cache.fs_path, ".cache_type_test_file" - ) - if not cache.fs.exists(test_cache_file): - cache.makedirs(cache.fs.path.parent(test_cache_file)) - with cache.fs.open(test_cache_file, "wb") as fobj: - fobj.write(bytes(1)) - try: - _link(cache, test_cache_file, workspace_file) - finally: - cache.fs.remove(workspace_file) - cache.fs.remove(test_cache_file) - - cache.cache_type_confirmed = True - - -def _relink(cache, cache_info, fs, path, in_cache, force): +def _relink(link, cache, cache_info, fs, path, in_cache, force): _remove(path, fs, in_cache, force=force) - _link(cache, cache_info, path) + link(cache, cache_info, fs, path) # NOTE: Depending on a file system (e.g. on NTFS), `_remove` might reset # read-only permissions in order to delete a hardlink to protected object, # which will also reset it for the object itself, making it unprotected, @@ -133,6 +47,7 @@ def _relink(cache, cache_info, fs, path, in_cache, force): def _checkout_file( + link, fs_path, fs, change, @@ -144,7 +59,6 @@ def _checkout_file( ): """The file is changed we need to checkout a new copy""" modified = False - _confirm_cache_type(cache, fs_path) cache_fs_path = cache.hash_to_path(change.new.oid.value) if change.old.oid: @@ -153,6 +67,7 @@ def _checkout_file( cache.unprotect(fs_path) else: _relink( + link, cache, cache_fs_path, fs, @@ -163,6 +78,7 @@ def _checkout_file( else: modified = True _relink( + link, cache, cache_fs_path, fs, @@ -171,7 +87,7 @@ def _checkout_file( force=force, ) else: - _link(cache, cache_fs_path, fs_path) + link(cache, cache_fs_path, fs, fs_path) modified = True if state: @@ -212,6 +128,24 @@ def _diff( return diff +class Link: + def __init__(self, links): + self._links = links + + @slow_link_guard + def __call__(self, cache, from_path, to_fs, to_path): + if to_fs.exists(to_path): + to_fs.remove(to_path) # broken symlink + + cache.makedirs(cache.fs.path.parent(to_path)) + try: + transfer(cache.fs, from_path, to_fs, to_path, links=self._links) + except FileNotFoundError as exc: + raise CheckoutError([to_path]) from exc + except OSError as exc: + raise CacheLinkError([to_path]) from exc + + def _checkout( diff, fs_path, @@ -225,6 +159,11 @@ def _checkout( if not diff: return + links = test_links(cache.cache_types, cache.fs, cache.fs_path, fs, fs_path) + if not links: + raise CacheLinkError([fs_path]) + link = Link(links) + for change in diff.deleted: entry_path = ( fs.path.join(fs_path, *change.old.key) @@ -246,6 +185,7 @@ def _checkout( try: _checkout_file( + link, entry_path, fs, change, diff --git a/dvc/objects/db/base.py b/dvc/objects/db/base.py index 8461cd95c7..7abcb28ddc 100644 --- a/dvc/objects/db/base.py +++ b/dvc/objects/db/base.py @@ -31,7 +31,6 @@ def __init__(self, fs: "BaseFileSystem", path: str, **config): self.state = config.get("state", StateNoop()) self.verify = config.get("verify", self.DEFAULT_VERIFY) self.cache_types = config.get("type") or copy(self.DEFAULT_CACHE_TYPES) - self.cache_type_confirmed = False self.slow_link_warning = config.get("slow_link_warning", True) self.tmp_dir = config.get("tmp_dir") self.read_only = config.get("read_only", False) diff --git a/dvc/objects/db/slow_link_detection.py b/dvc/objects/db/slow_link_detection.py index 1786bae6d1..67f82dab29 100644 --- a/dvc/objects/db/slow_link_detection.py +++ b/dvc/objects/db/slow_link_detection.py @@ -25,15 +25,15 @@ def slow_link_guard(f): @wraps(f) - def wrapper(odb, *args, **kwargs): + def wrapper(self, odb, *args, **kwargs): if this.already_displayed: - return f(odb, *args, **kwargs) + return f(self, odb, *args, **kwargs) if not odb.slow_link_warning or odb.cache_types: - return f(odb, *args, **kwargs) + return f(self, odb, *args, **kwargs) start = time.time() - result = f(odb, *args, **kwargs) + result = f(self, odb, *args, **kwargs) delta = time.time() - start if delta >= this.timeout_seconds: diff --git a/tests/func/test_add.py b/tests/func/test_add.py index 0a8e86d2db..76fa17b369 100644 --- a/tests/func/test_add.py +++ b/tests/func/test_add.py @@ -861,7 +861,7 @@ def test_add_optimization_for_hardlink_on_empty_files(tmp_dir, dvc, mocker): m = mocker.spy(LocalFileSystem, "is_hardlink") stages = dvc.add(["foo", "bar", "lorem", "ipsum"]) - assert m.call_count == 1 + assert m.call_count == 4 assert m.call_args != call(tmp_dir / "foo") assert m.call_args != call(tmp_dir / "bar") @@ -958,8 +958,8 @@ def test_add_with_cache_link_error(tmp_dir, dvc, mocker, capsys): tmp_dir.gen("foo", "foo") mocker.patch( - "dvc.objects.checkout._do_link", - side_effect=OSError(errno.ENOTSUP, "link failed"), + "dvc.objects.checkout.test_links", + return_value=[], ) dvc.add("foo") err = capsys.readouterr()[1] diff --git a/tests/unit/remote/test_slow_link_detection.py b/tests/unit/remote/test_slow_link_detection.py index 07e884e9e7..5913a90e39 100644 --- a/tests/unit/remote/test_slow_link_detection.py +++ b/tests/unit/remote/test_slow_link_detection.py @@ -24,8 +24,8 @@ def _make_remote(cache_type=None, should_warn=True): def test_show_warning_once(caplog, make_remote): remote = make_remote() - slow_link_guard(lambda x: None)(remote) - slow_link_guard(lambda x: None)(remote) + slow_link_guard(lambda x, y: None)(None, remote) + slow_link_guard(lambda x, y: None)(None, remote) slow_link_detection = dvc.objects.db.slow_link_detection message = slow_link_detection.message # noqa, pylint: disable=no-member @@ -35,13 +35,13 @@ def test_show_warning_once(caplog, make_remote): def test_dont_warn_when_cache_type_is_set(caplog, make_remote): remote = make_remote(cache_type="copy") - slow_link_guard(lambda x: None)(remote) + slow_link_guard(lambda x, y: None)(None, remote) assert len(caplog.records) == 0 def test_dont_warn_when_warning_is_disabled(caplog, make_remote): remote = make_remote(should_warn=False) - slow_link_guard(lambda x: None)(remote) + slow_link_guard(lambda x, y: None)(None, remote) assert len(caplog.records) == 0