Skip to content

Commit

Permalink
dvc: use transfer in checkout/doctor/odb
Browse files Browse the repository at this point in the history
All three of those use more or less the same logic for testing different
link types and maybe then using them. So now instead of having 3
implementations, we'll have just one.

Filesystem `transfer` now will not try to catch particular errno's, but
instead will just ignore any OSError and move down to the next link type
to try out, and if the last one fails - it will raise the last exception,
so if it is a common problem - we'll be able to see it.

This patch also removes `odb.confirmed_cache_type`, as it was caching
link type that is not really a cache-level thing, but rather a checkout thing,
that should only be cached for particular objects we are linking, as multiple
objects could be linked to different filesystems from the same odb (e.g.
external local output on hdd and local data on your ssd).

Next step here will be to introduce metadata diff, so that we can get rid of
temporary Link wrapper and introduce Transfer and corresponding transfer
config, that will know what links to use and could have some other things
like callbacks, so we don't continue passing dozens of kwargs along the stack.

Continuing from #6963

Fixes #6980
  • Loading branch information
efiop committed Nov 15, 2021
1 parent 7df6090 commit 88660f6
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 178 deletions.
5 changes: 3 additions & 2 deletions dvc/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ def info(self, path):
def put_file(
self, from_file, to_info, callback=DEFAULT_CALLBACK, **kwargs
):
makedirs(self.path.parent(to_info), exist_ok=True)
tmp_file = tmp_fname(to_info)
parent = self.path.parent(to_info)
makedirs(parent, exist_ok=True)
tmp_file = self.path.join(parent, tmp_fname())
copyfile(from_file, tmp_file, callback=callback)
os.replace(tmp_file, to_info)

Expand Down
157 changes: 118 additions & 39 deletions dvc/fs/utils.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,99 @@
import errno
import logging
import os
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List, Optional

from .base import RemoteActionNotImplemented
from .local import LocalFileSystem

if TYPE_CHECKING:
from dvc.types import AnyPath

from .base import BaseFileSystem
from .types import AnyPath

logger = logging.getLogger(__name__)


def _link(
link: "AnyPath",
from_fs: "BaseFileSystem",
from_info: "AnyPath",
from_path: "AnyPath",
to_fs: "BaseFileSystem",
to_info: "AnyPath",
hardlink: bool = False,
to_path: "AnyPath",
) -> None:
if not isinstance(from_fs, type(to_fs)):
raise OSError(errno.EXDEV, "can't link across filesystems")

links = ["reflink"] + ["hardlink"] if hardlink else []
try:
func = getattr(to_fs, link)
func(from_path, to_path)
except (OSError, AttributeError, RemoteActionNotImplemented) as exc:
# NOTE: there are so many potential error codes that a link call can
# throw, that is safer to just catch them all and try another link.
raise OSError(
errno.ENOTSUP, f"'{link}' is not supported by {type(from_fs)}"
) from exc


def _copy(
from_fs: "BaseFileSystem",
from_path: "AnyPath",
to_fs: "BaseFileSystem",
to_path: "AnyPath",
) -> None:
if isinstance(from_fs, LocalFileSystem):
return to_fs.upload(from_path, to_path)

if isinstance(to_fs, LocalFileSystem):
return from_fs.download_file(from_path, to_path)

with from_fs.open(from_path, mode="rb") as fobj:
size = from_fs.getsize(from_path)
return to_fs.upload(fobj, to_path, total=size)


def _try_links(
links: List["AnyPath"],
from_fs: "BaseFileSystem",
from_path: "AnyPath",
to_fs: "BaseFileSystem",
to_path: "AnyPath",
) -> None:
error = None
while links:
link = links.pop(0)
try:
func = getattr(to_fs, link)
except AttributeError:
continue
link = links[0]

if link == "copy":
return _copy(from_fs, from_path, to_fs, to_path)

try:
return func(from_info, to_info)
except RemoteActionNotImplemented:
continue
return _link(link, from_fs, from_path, to_fs, to_path)
except OSError as exc:
if exc.errno not in [
errno.EXDEV,
errno.ENOTSUP,
errno.ENOSYS, # https://github.com/iterative/dvc/issues/6962
]:
if exc.errno not in [errno.ENOTSUP, errno.EXDEV]:
raise
error = exc

del links[0]

raise OSError(errno.ENOTSUP, "reflink and hardlink are not supported")
raise OSError(
errno.ENOTSUP, "no more link types left to try out"
) from error


def transfer(
from_fs: "BaseFileSystem",
from_info: "AnyPath",
from_path: "AnyPath",
to_fs: "BaseFileSystem",
to_info: "AnyPath",
to_path: "AnyPath",
hardlink: bool = False,
links: Optional[List["AnyPath"]] = None,
) -> None:
try:
try:
return _link(from_fs, from_info, to_fs, to_info, hardlink=hardlink)
except OSError as exc:
if exc.errno not in [errno.EXDEV, errno.ENOTSUP]:
raise

if isinstance(from_fs, LocalFileSystem):
return to_fs.upload(from_info, to_info)

if isinstance(to_fs, LocalFileSystem):
return from_fs.download_file(from_info, to_info)

with from_fs.open(from_info, mode="rb") as fobj:
size = from_fs.getsize(from_info)
return to_fs.upload(fobj, to_info, total=size)
assert not (hardlink and links)
if hardlink:
links = links or ["reflink", "hardlink", "copy"]
else:
links = links or ["reflink", "copy"]
_try_links(links, from_fs, from_path, to_fs, to_path)
except OSError as exc:
# If the target file already exists, we are going to simply
# ignore the exception (#4992).
Expand All @@ -83,7 +107,62 @@ def transfer(
and exc.__context__
and isinstance(exc.__context__, FileExistsError)
):
logger.debug("'%s' file already exists, skipping", to_info)
logger.debug("'%s' file already exists, skipping", to_path)
return None

raise


def _test_link(
link: "AnyPath",
from_fs: "BaseFileSystem",
from_file: "AnyPath",
to_fs: "BaseFileSystem",
to_file: "AnyPath",
) -> bool:
try:
_try_links([link], from_fs, from_file, to_fs, to_file)
except OSError:
logger.debug("", exc_info=True)
return False

try:
_is_link_func = getattr(to_fs, f"is_{link}")
return _is_link_func(to_file)
except AttributeError:
pass

return True


def test_links(
links: List["AnyPath"],
from_fs: "BaseFileSystem",
from_path: "AnyPath",
to_fs: "BaseFileSystem",
to_path: "AnyPath",
) -> List["AnyPath"]:
from dvc.utils import tmp_fname

from_file = from_fs.path.join(from_path, tmp_fname())
to_file = to_fs.path.join(
to_fs.path.parent(to_path),
tmp_fname(),
)

from_fs.makedirs(from_fs.path.parent(from_file))
with from_fs.open(from_file, "wb") as fobj:
fobj.write(b"test")

ret = []
try:
for link in links:
try:
if _test_link(link, from_fs, from_file, to_fs, to_file):
ret.append(link)
finally:
to_fs.remove(to_file)
finally:
from_fs.remove(from_file)

return ret
44 changes: 11 additions & 33 deletions dvc/info.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import errno
import itertools
import os
import pathlib
import platform
import uuid

import psutil

from dvc import __version__
from dvc.exceptions import NotDvcRepoError
from dvc.fs import FS_MAP, get_fs_cls, get_fs_config
from dvc.fs.utils import test_links
from dvc.repo import Repo
from dvc.scm.base import SCMError
from dvc.system import System
from dvc.utils import error_link
from dvc.utils.pkg import PKG

Expand Down Expand Up @@ -83,37 +81,17 @@ def _get_remotes(config):


def _get_linktype_support_info(repo):
odb = repo.odb.local

links = test_links(
["reflink", "hardlink", "symlink"],
odb.fs,
odb.fs_path,
repo.fs,
repo.root_dir,
)

links = {
"reflink": (System.reflink, None),
"hardlink": (System.hardlink, System.is_hardlink),
"symlink": (System.symlink, System.is_symlink),
}

fname = "." + str(uuid.uuid4())
src = os.path.join(repo.odb.local.cache_dir, fname)
open(src, "w", encoding="utf-8").close()
dst = os.path.join(repo.root_dir, fname)

cache = []

for name, (link, is_link) in links.items():
try:
link(src, dst)
status = "supported"
if is_link and not is_link(dst):
status = "broken"
os.unlink(dst)
except OSError as exc:
if exc.errno != errno.ENOTSUP:
raise
status = "not supported"

if status == "supported":
cache.append(name)
os.remove(src)

return ", ".join(cache)
return ", ".join(links)


def _get_supported_remotes():
Expand Down
Loading

0 comments on commit 88660f6

Please sign in to comment.