diff --git a/dvc/fs/local.py b/dvc/fs/local.py index 2e191fb0cc..51bdfc0b54 100644 --- a/dvc/fs/local.py +++ b/dvc/fs/local.py @@ -153,8 +153,9 @@ def info(self, path): def put_file( self, from_file, to_info, callback=DEFAULT_CALLBACK, **kwargs ): - makedirs(self.path.parent(to_info), exist_ok=True) - tmp_file = tmp_fname(to_info) + parent = self.path.parent(to_info) + makedirs(parent, exist_ok=True) + tmp_file = self.path.join(parent, tmp_fname()) copyfile(from_file, tmp_file, callback=callback) os.replace(tmp_file, to_info) diff --git a/dvc/fs/utils.py b/dvc/fs/utils.py index 1ddf8099e0..d408b83803 100644 --- a/dvc/fs/utils.py +++ b/dvc/fs/utils.py @@ -1,75 +1,99 @@ import errno import logging import os -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List, Optional from .base import RemoteActionNotImplemented from .local import LocalFileSystem if TYPE_CHECKING: - from dvc.types import AnyPath - from .base import BaseFileSystem + from .types import AnyPath logger = logging.getLogger(__name__) def _link( + link: "AnyPath", from_fs: "BaseFileSystem", - from_info: "AnyPath", + from_path: "AnyPath", to_fs: "BaseFileSystem", - to_info: "AnyPath", - hardlink: bool = False, + to_path: "AnyPath", ) -> None: if not isinstance(from_fs, type(to_fs)): raise OSError(errno.EXDEV, "can't link across filesystems") - links = ["reflink"] + ["hardlink"] if hardlink else [] + try: + func = getattr(to_fs, link) + func(from_path, to_path) + except (OSError, AttributeError, RemoteActionNotImplemented) as exc: + # NOTE: there are so many potential error codes that a link call can + # throw, that is safer to just catch them all and try another link. + raise OSError( + errno.ENOTSUP, f"'{link}' is not supported by {type(from_fs)}" + ) from exc + + +def _copy( + from_fs: "BaseFileSystem", + from_path: "AnyPath", + to_fs: "BaseFileSystem", + to_path: "AnyPath", +) -> None: + if isinstance(from_fs, LocalFileSystem): + return to_fs.upload(from_path, to_path) + + if isinstance(to_fs, LocalFileSystem): + return from_fs.download_file(from_path, to_path) + + with from_fs.open(from_path, mode="rb") as fobj: + size = from_fs.getsize(from_path) + return to_fs.upload(fobj, to_path, total=size) + + +def _try_links( + links: List["AnyPath"], + from_fs: "BaseFileSystem", + from_path: "AnyPath", + to_fs: "BaseFileSystem", + to_path: "AnyPath", +) -> None: + error = None while links: - link = links.pop(0) - try: - func = getattr(to_fs, link) - except AttributeError: - continue + link = links[0] + + if link == "copy": + return _copy(from_fs, from_path, to_fs, to_path) try: - return func(from_info, to_info) - except RemoteActionNotImplemented: - continue + return _link(link, from_fs, from_path, to_fs, to_path) except OSError as exc: - if exc.errno not in [ - errno.EXDEV, - errno.ENOTSUP, - errno.ENOSYS, # https://github.com/iterative/dvc/issues/6962 - ]: + if exc.errno not in [errno.ENOTSUP, errno.EXDEV]: raise + error = exc + + del links[0] - raise OSError(errno.ENOTSUP, "reflink and hardlink are not supported") + raise OSError( + errno.ENOTSUP, "no more link types left to try out" + ) from error def transfer( from_fs: "BaseFileSystem", - from_info: "AnyPath", + from_path: "AnyPath", to_fs: "BaseFileSystem", - to_info: "AnyPath", + to_path: "AnyPath", hardlink: bool = False, + links: Optional[List["AnyPath"]] = None, ) -> None: try: - try: - return _link(from_fs, from_info, to_fs, to_info, hardlink=hardlink) - except OSError as exc: - if exc.errno not in [errno.EXDEV, errno.ENOTSUP]: - raise - - if isinstance(from_fs, LocalFileSystem): - return to_fs.upload(from_info, to_info) - - if isinstance(to_fs, LocalFileSystem): - return from_fs.download_file(from_info, to_info) - - with from_fs.open(from_info, mode="rb") as fobj: - size = from_fs.getsize(from_info) - return to_fs.upload(fobj, to_info, total=size) + assert not (hardlink and links) + if hardlink: + links = links or ["reflink", "hardlink", "copy"] + else: + links = links or ["reflink", "copy"] + _try_links(links, from_fs, from_path, to_fs, to_path) except OSError as exc: # If the target file already exists, we are going to simply # ignore the exception (#4992). @@ -83,7 +107,62 @@ def transfer( and exc.__context__ and isinstance(exc.__context__, FileExistsError) ): - logger.debug("'%s' file already exists, skipping", to_info) + logger.debug("'%s' file already exists, skipping", to_path) return None raise + + +def _test_link( + link: "AnyPath", + from_fs: "BaseFileSystem", + from_file: "AnyPath", + to_fs: "BaseFileSystem", + to_file: "AnyPath", +) -> bool: + try: + _try_links([link], from_fs, from_file, to_fs, to_file) + except OSError: + logger.debug("", exc_info=True) + return False + + try: + _is_link_func = getattr(to_fs, f"is_{link}") + return _is_link_func(to_file) + except AttributeError: + pass + + return True + + +def test_links( + links: List["AnyPath"], + from_fs: "BaseFileSystem", + from_path: "AnyPath", + to_fs: "BaseFileSystem", + to_path: "AnyPath", +) -> List["AnyPath"]: + from dvc.utils import tmp_fname + + from_file = from_fs.path.join(from_path, tmp_fname()) + to_file = to_fs.path.join( + to_fs.path.parent(to_path), + tmp_fname(), + ) + + from_fs.makedirs(from_fs.path.parent(from_file)) + with from_fs.open(from_file, "wb") as fobj: + fobj.write(b"test") + + ret = [] + try: + for link in links: + try: + if _test_link(link, from_fs, from_file, to_fs, to_file): + ret.append(link) + finally: + to_fs.remove(to_file) + finally: + from_fs.remove(from_file) + + return ret diff --git a/dvc/info.py b/dvc/info.py index 736b665f7d..4455d0afea 100644 --- a/dvc/info.py +++ b/dvc/info.py @@ -1,18 +1,16 @@ -import errno import itertools import os import pathlib import platform -import uuid import psutil from dvc import __version__ from dvc.exceptions import NotDvcRepoError from dvc.fs import FS_MAP, get_fs_cls, get_fs_config +from dvc.fs.utils import test_links from dvc.repo import Repo from dvc.scm.base import SCMError -from dvc.system import System from dvc.utils import error_link from dvc.utils.pkg import PKG @@ -83,37 +81,17 @@ def _get_remotes(config): def _get_linktype_support_info(repo): + odb = repo.odb.local + + links = test_links( + ["reflink", "hardlink", "symlink"], + odb.fs, + odb.fs_path, + repo.fs, + repo.root_dir, + ) - links = { - "reflink": (System.reflink, None), - "hardlink": (System.hardlink, System.is_hardlink), - "symlink": (System.symlink, System.is_symlink), - } - - fname = "." + str(uuid.uuid4()) - src = os.path.join(repo.odb.local.cache_dir, fname) - open(src, "w", encoding="utf-8").close() - dst = os.path.join(repo.root_dir, fname) - - cache = [] - - for name, (link, is_link) in links.items(): - try: - link(src, dst) - status = "supported" - if is_link and not is_link(dst): - status = "broken" - os.unlink(dst) - except OSError as exc: - if exc.errno != errno.ENOTSUP: - raise - status = "not supported" - - if status == "supported": - cache.append(name) - os.remove(src) - - return ", ".join(cache) + return ", ".join(links) def _get_supported_remotes(): diff --git a/dvc/objects/checkout.py b/dvc/objects/checkout.py index bd737c0994..6c700eede5 100644 --- a/dvc/objects/checkout.py +++ b/dvc/objects/checkout.py @@ -1,16 +1,9 @@ -import errno import logging from itertools import chain -from shortuuid import uuid - from dvc import prompt -from dvc.exceptions import ( - CacheLinkError, - CheckoutError, - ConfirmRemoveError, - DvcException, -) +from dvc.exceptions import CacheLinkError, CheckoutError, ConfirmRemoveError +from dvc.fs.utils import test_links, transfer from dvc.ignore import DvcIgnoreFilter from dvc.objects.db.slow_link_detection import ( # type: ignore[attr-defined] slow_link_guard, @@ -43,88 +36,9 @@ def _remove(fs_path, fs, in_cache, force=False): fs.remove(fs_path) -def _verify_link(cache, fs_path, link_type): - if link_type == "hardlink" and cache.fs.getsize(fs_path) == 0: - return - - if cache.cache_type_confirmed: - return - - is_link = getattr(cache.fs, f"is_{link_type}", None) - if is_link and not is_link(fs_path): - cache.fs.remove(fs_path) - raise DvcException(f"failed to verify {link_type}") - - cache.cache_type_confirmed = True - - -def _do_link(cache, from_info, to_info, link_method): - if cache.fs.exists(to_info): - cache.fs.remove(to_info) # broken symlink - - link_method(from_info, to_info) - - logger.debug( - "Created '%s': %s -> %s", cache.cache_types[0], from_info, to_info - ) - - -@slow_link_guard -def _try_links(cache, from_info, to_info, link_types): - while link_types: - link_method = getattr(cache.fs, link_types[0]) - try: - _do_link(cache, from_info, to_info, link_method) - _verify_link(cache, to_info, link_types[0]) - return - - except OSError as exc: - if exc.errno not in [errno.EXDEV, errno.ENOTSUP, errno.ENOSYS]: - raise - logger.debug( - "Cache type '%s' is not supported: %s", link_types[0], exc - ) - del link_types[0] - - raise CacheLinkError([to_info]) - - -def _link(cache, from_info, to_info): - cache.makedirs(cache.fs.path.parent(to_info)) - try: - _try_links(cache, from_info, to_info, cache.cache_types) - except FileNotFoundError as exc: - raise CheckoutError([str(to_info)]) from exc - - -def _confirm_cache_type(cache, fs_path): - """Checks whether cache uses copies.""" - if cache.cache_type_confirmed: - return - - if set(cache.cache_types) <= {"copy"}: - return - - workspace_file = cache.fs.path.with_name(fs_path, "." + uuid()) - test_cache_file = cache.fs.path.join( - cache.fs_path, ".cache_type_test_file" - ) - if not cache.fs.exists(test_cache_file): - cache.makedirs(cache.fs.path.parent(test_cache_file)) - with cache.fs.open(test_cache_file, "wb") as fobj: - fobj.write(bytes(1)) - try: - _link(cache, test_cache_file, workspace_file) - finally: - cache.fs.remove(workspace_file) - cache.fs.remove(test_cache_file) - - cache.cache_type_confirmed = True - - -def _relink(cache, cache_info, fs, path, in_cache, force): +def _relink(link, cache, cache_info, fs, path, in_cache, force): _remove(path, fs, in_cache, force=force) - _link(cache, cache_info, path) + link(cache, cache_info, fs, path) # NOTE: Depending on a file system (e.g. on NTFS), `_remove` might reset # read-only permissions in order to delete a hardlink to protected object, # which will also reset it for the object itself, making it unprotected, @@ -133,6 +47,7 @@ def _relink(cache, cache_info, fs, path, in_cache, force): def _checkout_file( + link, fs_path, fs, change, @@ -144,7 +59,6 @@ def _checkout_file( ): """The file is changed we need to checkout a new copy""" modified = False - _confirm_cache_type(cache, fs_path) cache_fs_path = cache.hash_to_path(change.new.oid.value) if change.old.oid: @@ -153,6 +67,7 @@ def _checkout_file( cache.unprotect(fs_path) else: _relink( + link, cache, cache_fs_path, fs, @@ -163,6 +78,7 @@ def _checkout_file( else: modified = True _relink( + link, cache, cache_fs_path, fs, @@ -171,7 +87,7 @@ def _checkout_file( force=force, ) else: - _link(cache, cache_fs_path, fs_path) + link(cache, cache_fs_path, fs, fs_path) modified = True if state: @@ -212,6 +128,24 @@ def _diff( return diff +class Link: + def __init__(self, links): + self._links = links + + @slow_link_guard + def __call__(self, cache, from_path, to_fs, to_path): + if to_fs.exists(to_path): + to_fs.remove(to_path) # broken symlink + + cache.makedirs(cache.fs.path.parent(to_path)) + try: + transfer(cache.fs, from_path, to_fs, to_path, links=self._links) + except FileNotFoundError as exc: + raise CheckoutError([to_path]) from exc + except OSError as exc: + raise CacheLinkError([to_path]) from exc + + def _checkout( diff, fs_path, @@ -225,6 +159,11 @@ def _checkout( if not diff: return + links = test_links(cache.cache_types, cache.fs, cache.fs_path, fs, fs_path) + if not links: + raise CacheLinkError([fs_path]) + link = Link(links) + for change in diff.deleted: entry_path = ( fs.path.join(fs_path, *change.old.key) @@ -246,6 +185,7 @@ def _checkout( try: _checkout_file( + link, entry_path, fs, change, diff --git a/dvc/objects/db/base.py b/dvc/objects/db/base.py index 8461cd95c7..7abcb28ddc 100644 --- a/dvc/objects/db/base.py +++ b/dvc/objects/db/base.py @@ -31,7 +31,6 @@ def __init__(self, fs: "BaseFileSystem", path: str, **config): self.state = config.get("state", StateNoop()) self.verify = config.get("verify", self.DEFAULT_VERIFY) self.cache_types = config.get("type") or copy(self.DEFAULT_CACHE_TYPES) - self.cache_type_confirmed = False self.slow_link_warning = config.get("slow_link_warning", True) self.tmp_dir = config.get("tmp_dir") self.read_only = config.get("read_only", False) diff --git a/dvc/objects/db/slow_link_detection.py b/dvc/objects/db/slow_link_detection.py index 1786bae6d1..67f82dab29 100644 --- a/dvc/objects/db/slow_link_detection.py +++ b/dvc/objects/db/slow_link_detection.py @@ -25,15 +25,15 @@ def slow_link_guard(f): @wraps(f) - def wrapper(odb, *args, **kwargs): + def wrapper(self, odb, *args, **kwargs): if this.already_displayed: - return f(odb, *args, **kwargs) + return f(self, odb, *args, **kwargs) if not odb.slow_link_warning or odb.cache_types: - return f(odb, *args, **kwargs) + return f(self, odb, *args, **kwargs) start = time.time() - result = f(odb, *args, **kwargs) + result = f(self, odb, *args, **kwargs) delta = time.time() - start if delta >= this.timeout_seconds: diff --git a/tests/func/test_add.py b/tests/func/test_add.py index 0a8e86d2db..76fa17b369 100644 --- a/tests/func/test_add.py +++ b/tests/func/test_add.py @@ -861,7 +861,7 @@ def test_add_optimization_for_hardlink_on_empty_files(tmp_dir, dvc, mocker): m = mocker.spy(LocalFileSystem, "is_hardlink") stages = dvc.add(["foo", "bar", "lorem", "ipsum"]) - assert m.call_count == 1 + assert m.call_count == 4 assert m.call_args != call(tmp_dir / "foo") assert m.call_args != call(tmp_dir / "bar") @@ -958,8 +958,8 @@ def test_add_with_cache_link_error(tmp_dir, dvc, mocker, capsys): tmp_dir.gen("foo", "foo") mocker.patch( - "dvc.objects.checkout._do_link", - side_effect=OSError(errno.ENOTSUP, "link failed"), + "dvc.objects.checkout.test_links", + return_value=[], ) dvc.add("foo") err = capsys.readouterr()[1] diff --git a/tests/unit/remote/test_slow_link_detection.py b/tests/unit/remote/test_slow_link_detection.py index 07e884e9e7..5913a90e39 100644 --- a/tests/unit/remote/test_slow_link_detection.py +++ b/tests/unit/remote/test_slow_link_detection.py @@ -24,8 +24,8 @@ def _make_remote(cache_type=None, should_warn=True): def test_show_warning_once(caplog, make_remote): remote = make_remote() - slow_link_guard(lambda x: None)(remote) - slow_link_guard(lambda x: None)(remote) + slow_link_guard(lambda x, y: None)(None, remote) + slow_link_guard(lambda x, y: None)(None, remote) slow_link_detection = dvc.objects.db.slow_link_detection message = slow_link_detection.message # noqa, pylint: disable=no-member @@ -35,13 +35,13 @@ def test_show_warning_once(caplog, make_remote): def test_dont_warn_when_cache_type_is_set(caplog, make_remote): remote = make_remote(cache_type="copy") - slow_link_guard(lambda x: None)(remote) + slow_link_guard(lambda x, y: None)(None, remote) assert len(caplog.records) == 0 def test_dont_warn_when_warning_is_disabled(caplog, make_remote): remote = make_remote(should_warn=False) - slow_link_guard(lambda x: None)(remote) + slow_link_guard(lambda x, y: None)(None, remote) assert len(caplog.records) == 0