From 6ea5a7ce22946910655624c9e96e1b18bd9ad65a Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Sun, 19 Jul 2020 21:33:07 +0300 Subject: [PATCH] dvc: separate cache and remote classes Finalizes code part of #4050 --- dvc/{cache.py => cache/__init__.py} | 8 +- dvc/cache/base.py | 554 ++++++++++++++++++++++++++++ dvc/cache/local.py | 431 ++++++++++++++++++++++ dvc/remote/base.py | 552 +-------------------------- dvc/remote/local.py | 435 +--------------------- dvc/repo/status.py | 2 +- dvc/repo/tree.py | 3 +- dvc/stage/cache.py | 2 +- dvc/tree/base.py | 13 - tests/func/remote/test_index.py | 3 +- tests/func/test_add.py | 2 +- tests/func/test_cache.py | 2 +- tests/func/test_checkout.py | 3 +- tests/func/test_data_cloud.py | 4 +- tests/func/test_external_repo.py | 2 +- tests/func/test_gc.py | 2 +- tests/func/test_import.py | 2 +- tests/func/test_repro.py | 2 +- tests/func/test_s3.py | 2 +- tests/func/test_stage.py | 2 +- tests/unit/output/test_local.py | 2 +- tests/unit/remote/test_local.py | 2 +- 22 files changed, 1011 insertions(+), 1019 deletions(-) rename dvc/{cache.py => cache/__init__.py} (96%) create mode 100644 dvc/cache/base.py create mode 100644 dvc/cache/local.py diff --git a/dvc/cache.py b/dvc/cache/__init__.py similarity index 96% rename from dvc/cache.py rename to dvc/cache/__init__.py index 291808b9a1..1cb0fc994b 100644 --- a/dvc/cache.py +++ b/dvc/cache/__init__.py @@ -28,8 +28,8 @@ def _make_remote_property(name): """ def getter(self): - from dvc.remote import get_cloud_tree - from dvc.remote.base import CloudCache + from ..tree import get_cloud_tree + from .base import CloudCache remote = self.config.get(name) if not remote: @@ -52,8 +52,8 @@ class Cache: CACHE_DIR = "cache" def __init__(self, repo): - from dvc.remote import get_cloud_tree - from dvc.remote.local import LocalCache + from ..tree import get_cloud_tree + from .local import LocalCache self.repo = repo self.config = config = repo.config["cache"] diff --git a/dvc/cache/base.py b/dvc/cache/base.py new file mode 100644 index 0000000000..64617ef994 --- /dev/null +++ b/dvc/cache/base.py @@ -0,0 +1,554 @@ +import json +import logging +from copy import copy + +from shortuuid import uuid + +import dvc.prompt as prompt +from dvc.exceptions import CheckoutError, ConfirmRemoveError, DvcException +from dvc.path_info import WindowsPathInfo +from dvc.progress import Tqdm +from dvc.remote.slow_link_detection import slow_link_guard + +from ..tree.base import RemoteActionNotImplemented + +logger = logging.getLogger(__name__) + +STATUS_OK = 1 +STATUS_MISSING = 2 +STATUS_NEW = 3 +STATUS_DELETED = 4 + +STATUS_MAP = { + # (local_exists, remote_exists) + (True, True): STATUS_OK, + (False, False): STATUS_MISSING, + (True, False): STATUS_NEW, + (False, True): STATUS_DELETED, +} + + +class DirCacheError(DvcException): + def __init__(self, hash_): + super().__init__( + f"Failed to load dir cache for hash value: '{hash_}'." + ) + + +class CloudCache: + """Cloud cache class.""" + + DEFAULT_CACHE_TYPES = ["copy"] + CACHE_MODE = None + + def __init__(self, tree): + self.tree = tree + self.repo = tree.repo + + self.cache_types = tree.config.get("type") or copy( + self.DEFAULT_CACHE_TYPES + ) + self.cache_type_confirmed = False + self._dir_info = {} + + def get_dir_cache(self, hash_): + assert hash_ + + dir_info = self._dir_info.get(hash_) + if dir_info: + return dir_info + + try: + dir_info = self.load_dir_cache(hash_) + except DirCacheError: + dir_info = [] + + self._dir_info[hash_] = dir_info + return dir_info + + def load_dir_cache(self, hash_): + path_info = self.tree.hash_to_path_info(hash_) + + try: + with self.tree.open(path_info, "r") as fobj: + d = json.load(fobj) + except (ValueError, FileNotFoundError) as exc: + raise DirCacheError(hash_) from exc + + if not isinstance(d, list): + logger.error( + "dir cache file format error '%s' [skipping the file]", + path_info, + ) + return [] + + if self.tree.PATH_CLS == WindowsPathInfo: + # only need to convert it for Windows + for info in d: + # NOTE: here is a BUG, see comment to .as_posix() below + relpath = info[self.tree.PARAM_RELPATH] + info[self.tree.PARAM_RELPATH] = relpath.replace( + "/", self.tree.PATH_CLS.sep + ) + + return d + + def changed(self, path_info, hash_info): + """Checks if data has changed. + + A file is considered changed if: + - It doesn't exist on the working directory (was unlinked) + - Hash value is not computed (saving a new file) + - The hash value stored is different from the given one + - There's no file in the cache + + Args: + path_info: dict with path information. + hash: expected hash value for this data. + + Returns: + bool: True if data has changed, False otherwise. + """ + + logger.debug( + "checking if '%s'('%s') has changed.", path_info, hash_info + ) + + if not self.tree.exists(path_info): + logger.debug("'%s' doesn't exist.", path_info) + return True + + hash_ = hash_info.get(self.tree.PARAM_CHECKSUM) + if hash_ is None: + logger.debug("hash value for '%s' is missing.", path_info) + return True + + if self.changed_cache(hash_): + logger.debug("cache for '%s'('%s') has changed.", path_info, hash_) + return True + + actual = self.tree.get_hash(path_info) + if hash_ != actual: + logger.debug( + "hash value '%s' for '%s' has changed (actual '%s').", + hash_, + actual, + path_info, + ) + return True + + logger.debug("'%s' hasn't changed.", path_info) + return False + + def link(self, from_info, to_info): + self._link(from_info, to_info, self.cache_types) + + def _link(self, from_info, to_info, link_types): + assert self.tree.isfile(from_info) + + self.tree.makedirs(to_info.parent) + + self._try_links(from_info, to_info, link_types) + + def _verify_link(self, path_info, link_type): + if self.cache_type_confirmed: + return + + is_link = getattr(self.tree, f"is_{link_type}", None) + if is_link and not is_link(path_info): + self.tree.remove(path_info) + raise DvcException(f"failed to verify {link_type}") + + self.cache_type_confirmed = True + + @slow_link_guard + def _try_links(self, from_info, to_info, link_types): + while link_types: + link_method = getattr(self.tree, link_types[0]) + try: + self._do_link(from_info, to_info, link_method) + self._verify_link(to_info, link_types[0]) + return + + except DvcException as exc: + logger.debug( + "Cache type '%s' is not supported: %s", link_types[0], exc + ) + del link_types[0] + + raise DvcException("no possible cache types left to try out.") + + def _do_link(self, from_info, to_info, link_method): + if self.tree.exists(to_info): + raise DvcException(f"Link '{to_info}' already exists!") + + link_method(from_info, to_info) + + logger.debug( + "Created '%s': %s -> %s", self.cache_types[0], from_info, to_info, + ) + + def _save_file(self, path_info, tree, hash_, save_link=True, **kwargs): + assert hash_ + + cache_info = self.tree.hash_to_path_info(hash_) + if tree == self.tree: + if self.changed_cache(hash_): + self.tree.move(path_info, cache_info, mode=self.CACHE_MODE) + self.link(cache_info, path_info) + elif self.tree.iscopy(path_info) and self._cache_is_copy( + path_info + ): + # Default relink procedure involves unneeded copy + self.tree.unprotect(path_info) + else: + self.tree.remove(path_info) + self.link(cache_info, path_info) + + if save_link: + self.tree.state.save_link(path_info) + # we need to update path and cache, since in case of reflink, + # or copy cache type moving original file results in updates on + # next executed command, which causes md5 recalculation + self.tree.state.save(path_info, hash_) + else: + if self.changed_cache(hash_): + with tree.open(path_info, mode="rb") as fobj: + # if tree has fetch enabled, DVC out will be fetched on + # open and we do not need to read/copy any data + if not ( + tree.isdvc(path_info, strict=False) and tree.fetch + ): + self.tree.copy_fobj(fobj, cache_info) + callback = kwargs.get("download_callback") + if callback: + callback(1) + + self.tree.state.save(cache_info, hash_) + return {self.tree.PARAM_CHECKSUM: hash_} + + def _cache_is_copy(self, path_info): + """Checks whether cache uses copies.""" + if self.cache_type_confirmed: + return self.cache_types[0] == "copy" + + if set(self.cache_types) <= {"copy"}: + return True + + workspace_file = path_info.with_name("." + uuid()) + test_cache_file = self.tree.path_info / ".cache_type_test_file" + if not self.tree.exists(test_cache_file): + with self.tree.open(test_cache_file, "wb") as fobj: + fobj.write(bytes(1)) + try: + self.link(test_cache_file, workspace_file) + finally: + self.tree.remove(workspace_file) + self.tree.remove(test_cache_file) + + self.cache_type_confirmed = True + return self.cache_types[0] == "copy" + + def _save_dir(self, path_info, tree, hash_, save_link=True, **kwargs): + dir_info = self.get_dir_cache(hash_) + for entry in Tqdm( + dir_info, desc="Saving " + path_info.name, unit="file" + ): + entry_info = path_info / entry[self.tree.PARAM_RELPATH] + entry_hash = entry[self.tree.PARAM_CHECKSUM] + self._save_file( + entry_info, tree, entry_hash, save_link=False, **kwargs + ) + + if save_link: + self.tree.state.save_link(path_info) + if self.tree.exists(path_info): + self.tree.state.save(path_info, hash_) + + cache_info = self.tree.hash_to_path_info(hash_) + self.tree.state.save(cache_info, hash_) + return {self.tree.PARAM_CHECKSUM: hash_} + + def save(self, path_info, tree, hash_info, save_link=True, **kwargs): + if path_info.scheme != self.tree.scheme: + raise RemoteActionNotImplemented( + f"save {path_info.scheme} -> {self.tree.scheme}", + self.tree.scheme, + ) + + if not hash_info: + hash_info = self.tree.save_info(path_info, tree=tree, **kwargs) + hash_ = hash_info[self.tree.PARAM_CHECKSUM] + return self._save(path_info, tree, hash_, save_link, **kwargs) + + def _save(self, path_info, tree, hash_, save_link=True, **kwargs): + to_info = self.tree.hash_to_path_info(hash_) + logger.debug("Saving '%s' to '%s'.", path_info, to_info) + + if tree.isdir(path_info): + return self._save_dir(path_info, tree, hash_, save_link, **kwargs) + return self._save_file(path_info, tree, hash_, save_link, **kwargs) + + # Override to return path as a string instead of PathInfo for clouds + # which support string paths (see local) + def hash_to_path(self, hash_): + return self.tree.hash_to_path_info(hash_) + + def changed_cache_file(self, hash_): + """Compare the given hash with the (corresponding) actual one. + + - Use `State` as a cache for computed hashes + + The entries are invalidated by taking into account the following: + * mtime + * inode + * size + * hash + + - Remove the file from cache if it doesn't match the actual hash + """ + # Prefer string path over PathInfo when possible due to performance + cache_info = self.hash_to_path(hash_) + if self.tree.is_protected(cache_info): + logger.debug( + "Assuming '%s' is unchanged since it is read-only", cache_info + ) + return False + + actual = self.tree.get_hash(cache_info) + + logger.debug( + "cache '%s' expected '%s' actual '%s'", cache_info, hash_, actual, + ) + + if not hash_ or not actual: + return True + + if actual.split(".")[0] == hash_.split(".")[0]: + # making cache file read-only so we don't need to check it + # next time + self.tree.protect(cache_info) + return False + + if self.tree.exists(cache_info): + logger.warning("corrupted cache file '%s'.", cache_info) + self.tree.remove(cache_info) + + return True + + def _changed_dir_cache(self, hash_, path_info=None, filter_info=None): + if self.changed_cache_file(hash_): + return True + + for entry in self.get_dir_cache(hash_): + entry_hash = entry[self.tree.PARAM_CHECKSUM] + + if path_info and filter_info: + entry_info = path_info / entry[self.tree.PARAM_RELPATH] + if not entry_info.isin_or_eq(filter_info): + continue + + if self.changed_cache_file(entry_hash): + return True + + return False + + def changed_cache(self, hash_, path_info=None, filter_info=None): + if self.tree.is_dir_hash(hash_): + return self._changed_dir_cache( + hash_, path_info=path_info, filter_info=filter_info + ) + return self.changed_cache_file(hash_) + + def already_cached(self, path_info): + current = self.tree.get_hash(path_info) + + if not current: + return False + + return not self.changed_cache(current) + + def safe_remove(self, path_info, force=False): + if not self.tree.exists(path_info): + return + + if not force and not self.already_cached(path_info): + msg = ( + "file '{}' is going to be removed." + " Are you sure you want to proceed?".format(str(path_info)) + ) + + if not prompt.confirm(msg): + raise ConfirmRemoveError(str(path_info)) + + self.tree.remove(path_info) + + def _checkout_file( + self, path_info, hash_, force, progress_callback=None, relink=False + ): + """The file is changed we need to checkout a new copy""" + added, modified = True, False + cache_info = self.tree.hash_to_path_info(hash_) + if self.tree.exists(path_info): + logger.debug("data '%s' will be replaced.", path_info) + self.safe_remove(path_info, force=force) + added, modified = False, True + + self.link(cache_info, path_info) + self.tree.state.save_link(path_info) + self.tree.state.save(path_info, hash_) + if progress_callback: + progress_callback(str(path_info)) + + return added, modified and not relink + + def _checkout_dir( + self, + path_info, + hash_, + force, + progress_callback=None, + relink=False, + filter_info=None, + ): + added, modified = False, False + # Create dir separately so that dir is created + # even if there are no files in it + if not self.tree.exists(path_info): + added = True + self.tree.makedirs(path_info) + + dir_info = self.get_dir_cache(hash_) + + logger.debug("Linking directory '%s'.", path_info) + + for entry in dir_info: + relative_path = entry[self.tree.PARAM_RELPATH] + entry_hash = entry[self.tree.PARAM_CHECKSUM] + entry_cache_info = self.tree.hash_to_path_info(entry_hash) + entry_info = path_info / relative_path + + if filter_info and not entry_info.isin_or_eq(filter_info): + continue + + entry_hash_info = {self.tree.PARAM_CHECKSUM: entry_hash} + if relink or self.changed(entry_info, entry_hash_info): + modified = True + self.safe_remove(entry_info, force=force) + self.link(entry_cache_info, entry_info) + self.tree.state.save(entry_info, entry_hash) + if progress_callback: + progress_callback(str(entry_info)) + + modified = ( + self._remove_redundant_files(path_info, dir_info, force) + or modified + ) + + self.tree.state.save_link(path_info) + self.tree.state.save(path_info, hash_) + + # relink is not modified, assume it as nochange + return added, not added and modified and not relink + + def _remove_redundant_files(self, path_info, dir_info, force): + existing_files = set(self.tree.walk_files(path_info)) + + needed_files = { + path_info / entry[self.tree.PARAM_RELPATH] for entry in dir_info + } + redundant_files = existing_files - needed_files + for path in redundant_files: + self.safe_remove(path, force) + + return bool(redundant_files) + + def checkout( + self, + path_info, + hash_info, + force=False, + progress_callback=None, + relink=False, + filter_info=None, + ): + if path_info.scheme not in ["local", self.tree.scheme]: + raise NotImplementedError + + hash_ = hash_info.get(self.tree.PARAM_CHECKSUM) + failed = None + skip = False + if not hash_: + logger.warning( + "No file hash info found for '%s'. " "It won't be created.", + path_info, + ) + self.safe_remove(path_info, force=force) + failed = path_info + + elif not relink and not self.changed(path_info, hash_info): + logger.debug("Data '%s' didn't change.", path_info) + skip = True + + elif self.changed_cache( + hash_, path_info=path_info, filter_info=filter_info + ): + logger.warning( + "Cache '%s' not found. File '%s' won't be created.", + hash_, + path_info, + ) + self.safe_remove(path_info, force=force) + failed = path_info + + if failed or skip: + if progress_callback: + progress_callback( + str(path_info), + self.get_files_number( + self.tree.path_info, hash_, filter_info + ), + ) + if failed: + raise CheckoutError([failed]) + return + + logger.debug("Checking out '%s' with cache '%s'.", path_info, hash_) + + return self._checkout( + path_info, hash_, force, progress_callback, relink, filter_info, + ) + + def _checkout( + self, + path_info, + hash_, + force=False, + progress_callback=None, + relink=False, + filter_info=None, + ): + if not self.tree.is_dir_hash(hash_): + return self._checkout_file( + path_info, hash_, force, progress_callback, relink + ) + + return self._checkout_dir( + path_info, hash_, force, progress_callback, relink, filter_info + ) + + def get_files_number(self, path_info, hash_, filter_info): + from funcy.py3 import ilen + + if not hash_: + return 0 + + if not self.tree.is_dir_hash(hash_): + return 1 + + if not filter_info: + return len(self.get_dir_cache(hash_)) + + return ilen( + filter_info.isin_or_eq(path_info / entry[self.tree.PARAM_CHECKSUM]) + for entry in self.get_dir_cache(hash_) + ) diff --git a/dvc/cache/local.py b/dvc/cache/local.py new file mode 100644 index 0000000000..1db76fb239 --- /dev/null +++ b/dvc/cache/local.py @@ -0,0 +1,431 @@ +import errno +import logging +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import partial, wraps + +from funcy import cached_property, concat + +from dvc.exceptions import DownloadError, UploadError +from dvc.path_info import PathInfo +from dvc.progress import Tqdm + +from ..remote.base import index_locked +from ..tree.local import LocalTree +from .base import ( + STATUS_DELETED, + STATUS_MAP, + STATUS_MISSING, + STATUS_NEW, + CloudCache, +) + +logger = logging.getLogger(__name__) + + +def _log_exceptions(func, operation): + @wraps(func) + def wrapper(from_info, to_info, *args, **kwargs): + try: + func(from_info, to_info, *args, **kwargs) + return 0 + except Exception as exc: # pylint: disable=broad-except + # NOTE: this means we ran out of file descriptors and there is no + # reason to try to proceed, as we will hit this error anyways. + # pylint: disable=no-member + if isinstance(exc, OSError) and exc.errno == errno.EMFILE: + raise + + logger.exception( + "failed to %s '%s' to '%s'", operation, from_info, to_info + ) + return 1 + + return wrapper + + +class LocalCache(CloudCache): + DEFAULT_CACHE_TYPES = ["reflink", "copy"] + CACHE_MODE = LocalTree.CACHE_MODE + + def __init__(self, tree): + super().__init__(tree) + self.cache_dir = tree.config.get("url") + + @property + def cache_dir(self): + return self.tree.path_info.fspath if self.tree.path_info else None + + @cache_dir.setter + def cache_dir(self, value): + self.tree.path_info = PathInfo(value) if value else None + + @classmethod + def supported(cls, config): # pylint: disable=unused-argument + return True + + @cached_property + def cache_path(self): + return os.path.abspath(self.cache_dir) + + def hash_to_path(self, hash_): + # NOTE: `self.cache_path` is already normalized so we can simply use + # `os.sep` instead of `os.path.join`. This results in this helper + # being ~5.5 times faster. + return f"{self.cache_path}{os.sep}{hash_[0:2]}{os.sep}{hash_[2:]}" + + def hashes_exist( + self, hashes, jobs=None, name=None + ): # pylint: disable=unused-argument + return [ + hash_ + for hash_ in Tqdm( + hashes, + unit="file", + desc="Querying " + + ("cache in " + name if name else "local cache"), + ) + if not self.changed_cache_file(hash_) + ] + + def already_cached(self, path_info): + assert path_info.scheme in ["", "local"] + + current_md5 = self.tree.get_hash(path_info) + + if not current_md5: + return False + + return not self.changed_cache(current_md5) + + def _verify_link(self, path_info, link_type): + if link_type == "hardlink" and self.tree.getsize(path_info) == 0: + return + + super()._verify_link(path_info, link_type) + + @index_locked + def status( + self, + named_cache, + remote, + jobs=None, + show_checksums=False, + download=False, + ): + # Return flattened dict containing all status info + dir_status, file_status, _ = self._status( + named_cache, + remote, + jobs=jobs, + show_checksums=show_checksums, + download=download, + ) + return dict(dir_status, **file_status) + + def _status( + self, + named_cache, + remote, + jobs=None, + show_checksums=False, + download=False, + ): + """Return a tuple of (dir_status_info, file_status_info, dir_contents). + + dir_status_info contains status for .dir files, file_status_info + contains status for all other files, and dir_contents is a dict of + {dir_hash: set(file_hash, ...)} which can be used to map + a .dir file to its file contents. + """ + logger.debug( + f"Preparing to collect status from {remote.tree.path_info}" + ) + md5s = set(named_cache.scheme_keys(self.tree.scheme)) + + logger.debug("Collecting information from local cache...") + local_exists = frozenset( + self.hashes_exist(md5s, jobs=jobs, name=self.cache_dir) + ) + + # This is a performance optimization. We can safely assume that, + # if the resources that we want to fetch are already cached, + # there's no need to check the remote storage for the existence of + # those files. + if download and local_exists == md5s: + remote_exists = local_exists + else: + logger.debug("Collecting information from remote cache...") + remote_exists = set() + dir_md5s = set(named_cache.dir_keys(self.tree.scheme)) + if dir_md5s: + remote_exists.update( + self._indexed_dir_hashes(named_cache, remote, dir_md5s) + ) + md5s.difference_update(remote_exists) + if md5s: + remote_exists.update( + remote.hashes_exist( + md5s, jobs=jobs, name=str(remote.tree.path_info) + ) + ) + return self._make_status( + named_cache, show_checksums, local_exists, remote_exists + ) + + def _make_status( + self, named_cache, show_checksums, local_exists, remote_exists + ): + def make_names(hash_, names): + return {"name": hash_ if show_checksums else " ".join(names)} + + dir_status = {} + file_status = {} + dir_contents = {} + for hash_, item in named_cache[self.tree.scheme].items(): + if item.children: + dir_status[hash_] = make_names(hash_, item.names) + dir_contents[hash_] = set() + for child_hash, child in item.children.items(): + file_status[child_hash] = make_names( + child_hash, child.names + ) + dir_contents[hash_].add(child_hash) + else: + file_status[hash_] = make_names(hash_, item.names) + + self._fill_statuses(dir_status, local_exists, remote_exists) + self._fill_statuses(file_status, local_exists, remote_exists) + + self._log_missing_caches(dict(dir_status, **file_status)) + + return dir_status, file_status, dir_contents + + def _indexed_dir_hashes(self, named_cache, remote, dir_md5s): + # Validate our index by verifying all indexed .dir hashes + # still exist on the remote + indexed_dirs = set(remote.index.dir_hashes()) + indexed_dir_exists = set() + if indexed_dirs: + indexed_dir_exists.update( + remote.tree.list_hashes_exists(indexed_dirs) + ) + missing_dirs = indexed_dirs.difference(indexed_dir_exists) + if missing_dirs: + logger.debug( + "Remote cache missing indexed .dir hashes '{}', " + "clearing remote index".format(", ".join(missing_dirs)) + ) + remote.index.clear() + + # Check if non-indexed (new) dir hashes exist on remote + dir_exists = dir_md5s.intersection(indexed_dir_exists) + dir_exists.update( + remote.tree.list_hashes_exists(dir_md5s - dir_exists) + ) + + # If .dir hash exists on the remote, assume directory contents + # still exists on the remote + for dir_hash in dir_exists: + file_hashes = list( + named_cache.child_keys(self.tree.scheme, dir_hash) + ) + if dir_hash not in remote.index: + logger.debug( + "Indexing new .dir '{}' with '{}' nested files".format( + dir_hash, len(file_hashes) + ) + ) + remote.index.update([dir_hash], file_hashes) + yield dir_hash + yield from file_hashes + + @staticmethod + def _fill_statuses(hash_info_dir, local_exists, remote_exists): + # Using sets because they are way faster for lookups + local = set(local_exists) + remote = set(remote_exists) + + for md5, info in hash_info_dir.items(): + status = STATUS_MAP[(md5 in local, md5 in remote)] + info["status"] = status + + def _get_plans(self, download, remote, status_info, status): + cache = [] + path_infos = [] + names = [] + hashes = [] + for md5, info in Tqdm( + status_info.items(), desc="Analysing status", unit="file" + ): + if info["status"] == status: + cache.append(self.tree.hash_to_path_info(md5)) + path_infos.append(remote.tree.hash_to_path_info(md5)) + names.append(info["name"]) + hashes.append(md5) + + if download: + to_infos = cache + from_infos = path_infos + else: + to_infos = path_infos + from_infos = cache + + return from_infos, to_infos, names, hashes + + def _process( + self, + named_cache, + remote, + jobs=None, + show_checksums=False, + download=False, + ): + logger.debug( + "Preparing to {} '{}'".format( + "download data from" if download else "upload data to", + remote.tree.path_info, + ) + ) + + if download: + func = partial( + _log_exceptions(remote.tree.download, "download"), + dir_mode=self.tree.dir_mode, + file_mode=self.tree.file_mode, + ) + status = STATUS_DELETED + desc = "Downloading" + else: + func = _log_exceptions(remote.tree.upload, "upload") + status = STATUS_NEW + desc = "Uploading" + + if jobs is None: + jobs = remote.tree.JOBS + + dir_status, file_status, dir_contents = self._status( + named_cache, + remote, + jobs=jobs, + show_checksums=show_checksums, + download=download, + ) + + dir_plans = self._get_plans(download, remote, dir_status, status) + file_plans = self._get_plans(download, remote, file_status, status) + + total = len(dir_plans[0]) + len(file_plans[0]) + if total == 0: + return 0 + + with Tqdm(total=total, unit="file", desc=desc) as pbar: + func = pbar.wrap_fn(func) + with ThreadPoolExecutor(max_workers=jobs) as executor: + if download: + from_infos, to_infos, names, _ = ( + d + f for d, f in zip(dir_plans, file_plans) + ) + fails = sum( + executor.map(func, from_infos, to_infos, names) + ) + else: + # for uploads, push files first, and any .dir files last + + file_futures = {} + for from_info, to_info, name, hash_ in zip(*file_plans): + file_futures[hash_] = executor.submit( + func, from_info, to_info, name + ) + dir_futures = {} + for from_info, to_info, name, dir_hash in zip(*dir_plans): + wait_futures = { + future + for file_hash, future in file_futures.items() + if file_hash in dir_contents[dir_hash] + } + dir_futures[dir_hash] = executor.submit( + self._dir_upload, + func, + wait_futures, + from_info, + to_info, + name, + ) + fails = sum( + future.result() + for future in concat( + file_futures.values(), dir_futures.values() + ) + ) + + if fails: + if download: + remote.index.clear() + raise DownloadError(fails) + raise UploadError(fails) + + if not download: + # index successfully pushed dirs + for dir_hash, future in dir_futures.items(): + if future.result() == 0: + file_hashes = dir_contents[dir_hash] + logger.debug( + "Indexing pushed dir '{}' with " + "'{}' nested files".format(dir_hash, len(file_hashes)) + ) + remote.index.update([dir_hash], file_hashes) + + return len(dir_plans[0]) + len(file_plans[0]) + + @staticmethod + def _dir_upload(func, futures, from_info, to_info, name): + for future in as_completed(futures): + if future.result(): + # do not upload this .dir file if any file in this + # directory failed to upload + logger.debug( + "failed to upload full contents of '{}', " + "aborting .dir file upload".format(name) + ) + logger.error(f"failed to upload '{from_info}' to '{to_info}'") + return 1 + return func(from_info, to_info, name) + + @index_locked + def push(self, named_cache, remote, jobs=None, show_checksums=False): + return self._process( + named_cache, + remote, + jobs=jobs, + show_checksums=show_checksums, + download=False, + ) + + @index_locked + def pull(self, named_cache, remote, jobs=None, show_checksums=False): + return self._process( + named_cache, + remote, + jobs=jobs, + show_checksums=show_checksums, + download=True, + ) + + @staticmethod + def _log_missing_caches(hash_info_dict): + missing_caches = [ + (md5, info) + for md5, info in hash_info_dict.items() + if info["status"] == STATUS_MISSING + ] + if missing_caches: + missing_desc = "\n".join( + "name: {}, md5: {}".format(info["name"], md5) + for md5, info in missing_caches + ) + msg = ( + "Some of the cache files do not exist neither locally " + "nor on remote. Missing cache files:\n{}".format(missing_desc) + ) + logger.warning(msg) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 246868db87..8b2bd90943 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -1,42 +1,11 @@ import hashlib -import json import logging -from copy import copy from functools import wraps -from shortuuid import uuid - -import dvc.prompt as prompt -from dvc.exceptions import CheckoutError, ConfirmRemoveError, DvcException -from dvc.path_info import WindowsPathInfo -from dvc.progress import Tqdm -from dvc.remote.index import RemoteIndex, RemoteIndexNoop -from dvc.remote.slow_link_detection import slow_link_guard - -from ..tree.base import RemoteActionNotImplemented +from .index import RemoteIndex, RemoteIndexNoop logger = logging.getLogger(__name__) -STATUS_OK = 1 -STATUS_MISSING = 2 -STATUS_NEW = 3 -STATUS_DELETED = 4 - -STATUS_MAP = { - # (local_exists, remote_exists) - (True, True): STATUS_OK, - (False, False): STATUS_MISSING, - (True, False): STATUS_NEW, - (False, True): STATUS_DELETED, -} - - -class DirCacheError(DvcException): - def __init__(self, hash_): - super().__init__( - f"Failed to load dir cache for hash value: '{hash_}'." - ) - def index_locked(f): @wraps(f) @@ -198,522 +167,3 @@ def gc(cls, named_cache, remote, jobs=None): if removed and hasattr(remote, "index"): remote.index.clear() return removed - - -class CloudCache: - """Cloud cache class.""" - - DEFAULT_CACHE_TYPES = ["copy"] - CACHE_MODE = None - - def __init__(self, tree): - self.tree = tree - self.repo = tree.repo - - self.cache_types = tree.config.get("type") or copy( - self.DEFAULT_CACHE_TYPES - ) - self.cache_type_confirmed = False - self._dir_info = {} - - def get_dir_cache(self, hash_): - assert hash_ - - dir_info = self._dir_info.get(hash_) - if dir_info: - return dir_info - - try: - dir_info = self.load_dir_cache(hash_) - except DirCacheError: - dir_info = [] - - self._dir_info[hash_] = dir_info - return dir_info - - def load_dir_cache(self, hash_): - path_info = self.tree.hash_to_path_info(hash_) - - try: - with self.tree.open(path_info, "r") as fobj: - d = json.load(fobj) - except (ValueError, FileNotFoundError) as exc: - raise DirCacheError(hash_) from exc - - if not isinstance(d, list): - logger.error( - "dir cache file format error '%s' [skipping the file]", - path_info, - ) - return [] - - if self.tree.PATH_CLS == WindowsPathInfo: - # only need to convert it for Windows - for info in d: - # NOTE: here is a BUG, see comment to .as_posix() below - relpath = info[self.tree.PARAM_RELPATH] - info[self.tree.PARAM_RELPATH] = relpath.replace( - "/", self.tree.PATH_CLS.sep - ) - - return d - - def changed(self, path_info, hash_info): - """Checks if data has changed. - - A file is considered changed if: - - It doesn't exist on the working directory (was unlinked) - - Hash value is not computed (saving a new file) - - The hash value stored is different from the given one - - There's no file in the cache - - Args: - path_info: dict with path information. - hash: expected hash value for this data. - - Returns: - bool: True if data has changed, False otherwise. - """ - - logger.debug( - "checking if '%s'('%s') has changed.", path_info, hash_info - ) - - if not self.tree.exists(path_info): - logger.debug("'%s' doesn't exist.", path_info) - return True - - hash_ = hash_info.get(self.tree.PARAM_CHECKSUM) - if hash_ is None: - logger.debug("hash value for '%s' is missing.", path_info) - return True - - if self.changed_cache(hash_): - logger.debug("cache for '%s'('%s') has changed.", path_info, hash_) - return True - - actual = self.tree.get_hash(path_info) - if hash_ != actual: - logger.debug( - "hash value '%s' for '%s' has changed (actual '%s').", - hash_, - actual, - path_info, - ) - return True - - logger.debug("'%s' hasn't changed.", path_info) - return False - - def link(self, from_info, to_info): - self._link(from_info, to_info, self.cache_types) - - def _link(self, from_info, to_info, link_types): - assert self.tree.isfile(from_info) - - self.tree.makedirs(to_info.parent) - - self._try_links(from_info, to_info, link_types) - - def _verify_link(self, path_info, link_type): - if self.cache_type_confirmed: - return - - is_link = getattr(self.tree, f"is_{link_type}", None) - if is_link and not is_link(path_info): - self.tree.remove(path_info) - raise DvcException(f"failed to verify {link_type}") - - self.cache_type_confirmed = True - - @slow_link_guard - def _try_links(self, from_info, to_info, link_types): - while link_types: - link_method = getattr(self.tree, link_types[0]) - try: - self._do_link(from_info, to_info, link_method) - self._verify_link(to_info, link_types[0]) - return - - except DvcException as exc: - logger.debug( - "Cache type '%s' is not supported: %s", link_types[0], exc - ) - del link_types[0] - - raise DvcException("no possible cache types left to try out.") - - def _do_link(self, from_info, to_info, link_method): - if self.tree.exists(to_info): - raise DvcException(f"Link '{to_info}' already exists!") - - link_method(from_info, to_info) - - logger.debug( - "Created '%s': %s -> %s", self.cache_types[0], from_info, to_info, - ) - - def _save_file(self, path_info, tree, hash_, save_link=True, **kwargs): - assert hash_ - - cache_info = self.tree.hash_to_path_info(hash_) - if tree == self.tree: - if self.changed_cache(hash_): - self.tree.move(path_info, cache_info, mode=self.CACHE_MODE) - self.link(cache_info, path_info) - elif self.tree.iscopy(path_info) and self._cache_is_copy( - path_info - ): - # Default relink procedure involves unneeded copy - self.tree.unprotect(path_info) - else: - self.tree.remove(path_info) - self.link(cache_info, path_info) - - if save_link: - self.tree.state.save_link(path_info) - # we need to update path and cache, since in case of reflink, - # or copy cache type moving original file results in updates on - # next executed command, which causes md5 recalculation - self.tree.state.save(path_info, hash_) - else: - if self.changed_cache(hash_): - with tree.open(path_info, mode="rb") as fobj: - # if tree has fetch enabled, DVC out will be fetched on - # open and we do not need to read/copy any data - if not ( - tree.isdvc(path_info, strict=False) and tree.fetch - ): - self.tree.copy_fobj(fobj, cache_info) - callback = kwargs.get("download_callback") - if callback: - callback(1) - - self.tree.state.save(cache_info, hash_) - return {self.tree.PARAM_CHECKSUM: hash_} - - def _cache_is_copy(self, path_info): - """Checks whether cache uses copies.""" - if self.cache_type_confirmed: - return self.cache_types[0] == "copy" - - if set(self.cache_types) <= {"copy"}: - return True - - workspace_file = path_info.with_name("." + uuid()) - test_cache_file = self.tree.path_info / ".cache_type_test_file" - if not self.tree.exists(test_cache_file): - with self.tree.open(test_cache_file, "wb") as fobj: - fobj.write(bytes(1)) - try: - self.link(test_cache_file, workspace_file) - finally: - self.tree.remove(workspace_file) - self.tree.remove(test_cache_file) - - self.cache_type_confirmed = True - return self.cache_types[0] == "copy" - - def _save_dir(self, path_info, tree, hash_, save_link=True, **kwargs): - dir_info = self.get_dir_cache(hash_) - for entry in Tqdm( - dir_info, desc="Saving " + path_info.name, unit="file" - ): - entry_info = path_info / entry[self.tree.PARAM_RELPATH] - entry_hash = entry[self.tree.PARAM_CHECKSUM] - self._save_file( - entry_info, tree, entry_hash, save_link=False, **kwargs - ) - - if save_link: - self.tree.state.save_link(path_info) - if self.tree.exists(path_info): - self.tree.state.save(path_info, hash_) - - cache_info = self.tree.hash_to_path_info(hash_) - self.tree.state.save(cache_info, hash_) - return {self.tree.PARAM_CHECKSUM: hash_} - - def save(self, path_info, tree, hash_info, save_link=True, **kwargs): - if path_info.scheme != self.tree.scheme: - raise RemoteActionNotImplemented( - f"save {path_info.scheme} -> {self.tree.scheme}", - self.tree.scheme, - ) - - if not hash_info: - hash_info = self.tree.save_info(path_info, tree=tree, **kwargs) - hash_ = hash_info[self.tree.PARAM_CHECKSUM] - return self._save(path_info, tree, hash_, save_link, **kwargs) - - def _save(self, path_info, tree, hash_, save_link=True, **kwargs): - to_info = self.tree.hash_to_path_info(hash_) - logger.debug("Saving '%s' to '%s'.", path_info, to_info) - - if tree.isdir(path_info): - return self._save_dir(path_info, tree, hash_, save_link, **kwargs) - return self._save_file(path_info, tree, hash_, save_link, **kwargs) - - # Override to return path as a string instead of PathInfo for clouds - # which support string paths (see local) - def hash_to_path(self, hash_): - return self.tree.hash_to_path_info(hash_) - - def changed_cache_file(self, hash_): - """Compare the given hash with the (corresponding) actual one. - - - Use `State` as a cache for computed hashes - + The entries are invalidated by taking into account the following: - * mtime - * inode - * size - * hash - - - Remove the file from cache if it doesn't match the actual hash - """ - # Prefer string path over PathInfo when possible due to performance - cache_info = self.hash_to_path(hash_) - if self.tree.is_protected(cache_info): - logger.debug( - "Assuming '%s' is unchanged since it is read-only", cache_info - ) - return False - - actual = self.tree.get_hash(cache_info) - - logger.debug( - "cache '%s' expected '%s' actual '%s'", cache_info, hash_, actual, - ) - - if not hash_ or not actual: - return True - - if actual.split(".")[0] == hash_.split(".")[0]: - # making cache file read-only so we don't need to check it - # next time - self.tree.protect(cache_info) - return False - - if self.tree.exists(cache_info): - logger.warning("corrupted cache file '%s'.", cache_info) - self.tree.remove(cache_info) - - return True - - def _changed_dir_cache(self, hash_, path_info=None, filter_info=None): - if self.changed_cache_file(hash_): - return True - - for entry in self.get_dir_cache(hash_): - entry_hash = entry[self.tree.PARAM_CHECKSUM] - - if path_info and filter_info: - entry_info = path_info / entry[self.tree.PARAM_RELPATH] - if not entry_info.isin_or_eq(filter_info): - continue - - if self.changed_cache_file(entry_hash): - return True - - return False - - def changed_cache(self, hash_, path_info=None, filter_info=None): - if self.tree.is_dir_hash(hash_): - return self._changed_dir_cache( - hash_, path_info=path_info, filter_info=filter_info - ) - return self.changed_cache_file(hash_) - - def already_cached(self, path_info): - current = self.tree.get_hash(path_info) - - if not current: - return False - - return not self.changed_cache(current) - - def safe_remove(self, path_info, force=False): - if not self.tree.exists(path_info): - return - - if not force and not self.already_cached(path_info): - msg = ( - "file '{}' is going to be removed." - " Are you sure you want to proceed?".format(str(path_info)) - ) - - if not prompt.confirm(msg): - raise ConfirmRemoveError(str(path_info)) - - self.tree.remove(path_info) - - def _checkout_file( - self, path_info, hash_, force, progress_callback=None, relink=False - ): - """The file is changed we need to checkout a new copy""" - added, modified = True, False - cache_info = self.tree.hash_to_path_info(hash_) - if self.tree.exists(path_info): - logger.debug("data '%s' will be replaced.", path_info) - self.safe_remove(path_info, force=force) - added, modified = False, True - - self.link(cache_info, path_info) - self.tree.state.save_link(path_info) - self.tree.state.save(path_info, hash_) - if progress_callback: - progress_callback(str(path_info)) - - return added, modified and not relink - - def _checkout_dir( - self, - path_info, - hash_, - force, - progress_callback=None, - relink=False, - filter_info=None, - ): - added, modified = False, False - # Create dir separately so that dir is created - # even if there are no files in it - if not self.tree.exists(path_info): - added = True - self.tree.makedirs(path_info) - - dir_info = self.get_dir_cache(hash_) - - logger.debug("Linking directory '%s'.", path_info) - - for entry in dir_info: - relative_path = entry[self.tree.PARAM_RELPATH] - entry_hash = entry[self.tree.PARAM_CHECKSUM] - entry_cache_info = self.tree.hash_to_path_info(entry_hash) - entry_info = path_info / relative_path - - if filter_info and not entry_info.isin_or_eq(filter_info): - continue - - entry_hash_info = {self.tree.PARAM_CHECKSUM: entry_hash} - if relink or self.changed(entry_info, entry_hash_info): - modified = True - self.safe_remove(entry_info, force=force) - self.link(entry_cache_info, entry_info) - self.tree.state.save(entry_info, entry_hash) - if progress_callback: - progress_callback(str(entry_info)) - - modified = ( - self._remove_redundant_files(path_info, dir_info, force) - or modified - ) - - self.tree.state.save_link(path_info) - self.tree.state.save(path_info, hash_) - - # relink is not modified, assume it as nochange - return added, not added and modified and not relink - - def _remove_redundant_files(self, path_info, dir_info, force): - existing_files = set(self.tree.walk_files(path_info)) - - needed_files = { - path_info / entry[self.tree.PARAM_RELPATH] for entry in dir_info - } - redundant_files = existing_files - needed_files - for path in redundant_files: - self.safe_remove(path, force) - - return bool(redundant_files) - - def checkout( - self, - path_info, - hash_info, - force=False, - progress_callback=None, - relink=False, - filter_info=None, - ): - if path_info.scheme not in ["local", self.tree.scheme]: - raise NotImplementedError - - hash_ = hash_info.get(self.tree.PARAM_CHECKSUM) - failed = None - skip = False - if not hash_: - logger.warning( - "No file hash info found for '%s'. " "It won't be created.", - path_info, - ) - self.safe_remove(path_info, force=force) - failed = path_info - - elif not relink and not self.changed(path_info, hash_info): - logger.debug("Data '%s' didn't change.", path_info) - skip = True - - elif self.changed_cache( - hash_, path_info=path_info, filter_info=filter_info - ): - logger.warning( - "Cache '%s' not found. File '%s' won't be created.", - hash_, - path_info, - ) - self.safe_remove(path_info, force=force) - failed = path_info - - if failed or skip: - if progress_callback: - progress_callback( - str(path_info), - self.get_files_number( - self.tree.path_info, hash_, filter_info - ), - ) - if failed: - raise CheckoutError([failed]) - return - - logger.debug("Checking out '%s' with cache '%s'.", path_info, hash_) - - return self._checkout( - path_info, hash_, force, progress_callback, relink, filter_info, - ) - - def _checkout( - self, - path_info, - hash_, - force=False, - progress_callback=None, - relink=False, - filter_info=None, - ): - if not self.tree.is_dir_hash(hash_): - return self._checkout_file( - path_info, hash_, force, progress_callback, relink - ) - - return self._checkout_dir( - path_info, hash_, force, progress_callback, relink, filter_info - ) - - def get_files_number(self, path_info, hash_, filter_info): - from funcy.py3 import ilen - - if not hash_: - return 0 - - if not self.tree.is_dir_hash(hash_): - return 1 - - if not filter_info: - return len(self.get_dir_cache(hash_)) - - return ilen( - filter_info.isin_or_eq(path_info / entry[self.tree.PARAM_CHECKSUM]) - for entry in self.get_dir_cache(hash_) - ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 4285c92ef0..f433c1f703 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -1,437 +1,6 @@ -import errno -import logging -import os -from concurrent.futures import ThreadPoolExecutor, as_completed -from functools import partial, wraps - -from funcy import cached_property, concat - -from dvc.exceptions import DownloadError, UploadError -from dvc.path_info import PathInfo -from dvc.progress import Tqdm -from dvc.remote.base import ( - STATUS_DELETED, - STATUS_MAP, - STATUS_MISSING, - STATUS_NEW, - CloudCache, - Remote, - index_locked, -) -from dvc.remote.index import RemoteIndexNoop - -from ..tree.local import LocalTree - -logger = logging.getLogger(__name__) - - -def _log_exceptions(func, operation): - @wraps(func) - def wrapper(from_info, to_info, *args, **kwargs): - try: - func(from_info, to_info, *args, **kwargs) - return 0 - except Exception as exc: # pylint: disable=broad-except - # NOTE: this means we ran out of file descriptors and there is no - # reason to try to proceed, as we will hit this error anyways. - # pylint: disable=no-member - if isinstance(exc, OSError) and exc.errno == errno.EMFILE: - raise - - logger.exception( - "failed to %s '%s' to '%s'", operation, from_info, to_info - ) - return 1 - - return wrapper +from .base import Remote +from .index import RemoteIndexNoop class LocalRemote(Remote): INDEX_CLS = RemoteIndexNoop - - -class LocalCache(CloudCache): - DEFAULT_CACHE_TYPES = ["reflink", "copy"] - CACHE_MODE = LocalTree.CACHE_MODE - - def __init__(self, tree): - super().__init__(tree) - self.cache_dir = tree.config.get("url") - - @property - def cache_dir(self): - return self.tree.path_info.fspath if self.tree.path_info else None - - @cache_dir.setter - def cache_dir(self, value): - self.tree.path_info = PathInfo(value) if value else None - - @classmethod - def supported(cls, config): # pylint: disable=unused-argument - return True - - @cached_property - def cache_path(self): - return os.path.abspath(self.cache_dir) - - def hash_to_path(self, hash_): - # NOTE: `self.cache_path` is already normalized so we can simply use - # `os.sep` instead of `os.path.join`. This results in this helper - # being ~5.5 times faster. - return f"{self.cache_path}{os.sep}{hash_[0:2]}{os.sep}{hash_[2:]}" - - def hashes_exist( - self, hashes, jobs=None, name=None - ): # pylint: disable=unused-argument - return [ - hash_ - for hash_ in Tqdm( - hashes, - unit="file", - desc="Querying " - + ("cache in " + name if name else "local cache"), - ) - if not self.changed_cache_file(hash_) - ] - - def already_cached(self, path_info): - assert path_info.scheme in ["", "local"] - - current_md5 = self.tree.get_hash(path_info) - - if not current_md5: - return False - - return not self.changed_cache(current_md5) - - def _verify_link(self, path_info, link_type): - if link_type == "hardlink" and self.tree.getsize(path_info) == 0: - return - - super()._verify_link(path_info, link_type) - - @index_locked - def status( - self, - named_cache, - remote, - jobs=None, - show_checksums=False, - download=False, - ): - # Return flattened dict containing all status info - dir_status, file_status, _ = self._status( - named_cache, - remote, - jobs=jobs, - show_checksums=show_checksums, - download=download, - ) - return dict(dir_status, **file_status) - - def _status( - self, - named_cache, - remote, - jobs=None, - show_checksums=False, - download=False, - ): - """Return a tuple of (dir_status_info, file_status_info, dir_contents). - - dir_status_info contains status for .dir files, file_status_info - contains status for all other files, and dir_contents is a dict of - {dir_hash: set(file_hash, ...)} which can be used to map - a .dir file to its file contents. - """ - logger.debug( - f"Preparing to collect status from {remote.tree.path_info}" - ) - md5s = set(named_cache.scheme_keys(self.tree.scheme)) - - logger.debug("Collecting information from local cache...") - local_exists = frozenset( - self.hashes_exist(md5s, jobs=jobs, name=self.cache_dir) - ) - - # This is a performance optimization. We can safely assume that, - # if the resources that we want to fetch are already cached, - # there's no need to check the remote storage for the existence of - # those files. - if download and local_exists == md5s: - remote_exists = local_exists - else: - logger.debug("Collecting information from remote cache...") - remote_exists = set() - dir_md5s = set(named_cache.dir_keys(self.tree.scheme)) - if dir_md5s: - remote_exists.update( - self._indexed_dir_hashes(named_cache, remote, dir_md5s) - ) - md5s.difference_update(remote_exists) - if md5s: - remote_exists.update( - remote.hashes_exist( - md5s, jobs=jobs, name=str(remote.tree.path_info) - ) - ) - return self._make_status( - named_cache, show_checksums, local_exists, remote_exists - ) - - def _make_status( - self, named_cache, show_checksums, local_exists, remote_exists - ): - def make_names(hash_, names): - return {"name": hash_ if show_checksums else " ".join(names)} - - dir_status = {} - file_status = {} - dir_contents = {} - for hash_, item in named_cache[self.tree.scheme].items(): - if item.children: - dir_status[hash_] = make_names(hash_, item.names) - dir_contents[hash_] = set() - for child_hash, child in item.children.items(): - file_status[child_hash] = make_names( - child_hash, child.names - ) - dir_contents[hash_].add(child_hash) - else: - file_status[hash_] = make_names(hash_, item.names) - - self._fill_statuses(dir_status, local_exists, remote_exists) - self._fill_statuses(file_status, local_exists, remote_exists) - - self._log_missing_caches(dict(dir_status, **file_status)) - - return dir_status, file_status, dir_contents - - def _indexed_dir_hashes(self, named_cache, remote, dir_md5s): - # Validate our index by verifying all indexed .dir hashes - # still exist on the remote - indexed_dirs = set(remote.index.dir_hashes()) - indexed_dir_exists = set() - if indexed_dirs: - indexed_dir_exists.update( - remote.tree.list_hashes_exists(indexed_dirs) - ) - missing_dirs = indexed_dirs.difference(indexed_dir_exists) - if missing_dirs: - logger.debug( - "Remote cache missing indexed .dir hashes '{}', " - "clearing remote index".format(", ".join(missing_dirs)) - ) - remote.index.clear() - - # Check if non-indexed (new) dir hashes exist on remote - dir_exists = dir_md5s.intersection(indexed_dir_exists) - dir_exists.update( - remote.tree.list_hashes_exists(dir_md5s - dir_exists) - ) - - # If .dir hash exists on the remote, assume directory contents - # still exists on the remote - for dir_hash in dir_exists: - file_hashes = list( - named_cache.child_keys(self.tree.scheme, dir_hash) - ) - if dir_hash not in remote.index: - logger.debug( - "Indexing new .dir '{}' with '{}' nested files".format( - dir_hash, len(file_hashes) - ) - ) - remote.index.update([dir_hash], file_hashes) - yield dir_hash - yield from file_hashes - - @staticmethod - def _fill_statuses(hash_info_dir, local_exists, remote_exists): - # Using sets because they are way faster for lookups - local = set(local_exists) - remote = set(remote_exists) - - for md5, info in hash_info_dir.items(): - status = STATUS_MAP[(md5 in local, md5 in remote)] - info["status"] = status - - def _get_plans(self, download, remote, status_info, status): - cache = [] - path_infos = [] - names = [] - hashes = [] - for md5, info in Tqdm( - status_info.items(), desc="Analysing status", unit="file" - ): - if info["status"] == status: - cache.append(self.tree.hash_to_path_info(md5)) - path_infos.append(remote.tree.hash_to_path_info(md5)) - names.append(info["name"]) - hashes.append(md5) - - if download: - to_infos = cache - from_infos = path_infos - else: - to_infos = path_infos - from_infos = cache - - return from_infos, to_infos, names, hashes - - def _process( - self, - named_cache, - remote, - jobs=None, - show_checksums=False, - download=False, - ): - logger.debug( - "Preparing to {} '{}'".format( - "download data from" if download else "upload data to", - remote.tree.path_info, - ) - ) - - if download: - func = partial( - _log_exceptions(remote.tree.download, "download"), - dir_mode=self.tree.dir_mode, - file_mode=self.tree.file_mode, - ) - status = STATUS_DELETED - desc = "Downloading" - else: - func = _log_exceptions(remote.tree.upload, "upload") - status = STATUS_NEW - desc = "Uploading" - - if jobs is None: - jobs = remote.tree.JOBS - - dir_status, file_status, dir_contents = self._status( - named_cache, - remote, - jobs=jobs, - show_checksums=show_checksums, - download=download, - ) - - dir_plans = self._get_plans(download, remote, dir_status, status) - file_plans = self._get_plans(download, remote, file_status, status) - - total = len(dir_plans[0]) + len(file_plans[0]) - if total == 0: - return 0 - - with Tqdm(total=total, unit="file", desc=desc) as pbar: - func = pbar.wrap_fn(func) - with ThreadPoolExecutor(max_workers=jobs) as executor: - if download: - from_infos, to_infos, names, _ = ( - d + f for d, f in zip(dir_plans, file_plans) - ) - fails = sum( - executor.map(func, from_infos, to_infos, names) - ) - else: - # for uploads, push files first, and any .dir files last - - file_futures = {} - for from_info, to_info, name, hash_ in zip(*file_plans): - file_futures[hash_] = executor.submit( - func, from_info, to_info, name - ) - dir_futures = {} - for from_info, to_info, name, dir_hash in zip(*dir_plans): - wait_futures = { - future - for file_hash, future in file_futures.items() - if file_hash in dir_contents[dir_hash] - } - dir_futures[dir_hash] = executor.submit( - self._dir_upload, - func, - wait_futures, - from_info, - to_info, - name, - ) - fails = sum( - future.result() - for future in concat( - file_futures.values(), dir_futures.values() - ) - ) - - if fails: - if download: - remote.index.clear() - raise DownloadError(fails) - raise UploadError(fails) - - if not download: - # index successfully pushed dirs - for dir_hash, future in dir_futures.items(): - if future.result() == 0: - file_hashes = dir_contents[dir_hash] - logger.debug( - "Indexing pushed dir '{}' with " - "'{}' nested files".format(dir_hash, len(file_hashes)) - ) - remote.index.update([dir_hash], file_hashes) - - return len(dir_plans[0]) + len(file_plans[0]) - - @staticmethod - def _dir_upload(func, futures, from_info, to_info, name): - for future in as_completed(futures): - if future.result(): - # do not upload this .dir file if any file in this - # directory failed to upload - logger.debug( - "failed to upload full contents of '{}', " - "aborting .dir file upload".format(name) - ) - logger.error(f"failed to upload '{from_info}' to '{to_info}'") - return 1 - return func(from_info, to_info, name) - - @index_locked - def push(self, named_cache, remote, jobs=None, show_checksums=False): - return self._process( - named_cache, - remote, - jobs=jobs, - show_checksums=show_checksums, - download=False, - ) - - @index_locked - def pull(self, named_cache, remote, jobs=None, show_checksums=False): - return self._process( - named_cache, - remote, - jobs=jobs, - show_checksums=show_checksums, - download=True, - ) - - @staticmethod - def _log_missing_caches(hash_info_dict): - missing_caches = [ - (md5, info) - for md5, info in hash_info_dict.items() - if info["status"] == STATUS_MISSING - ] - if missing_caches: - missing_desc = "\n".join( - "name: {}, md5: {}".format(info["name"], md5) - for md5, info in missing_caches - ) - msg = ( - "Some of the cache files do not exist neither locally " - "nor on remote. Missing cache files:\n{}".format(missing_desc) - ) - logger.warning(msg) diff --git a/dvc/repo/status.py b/dvc/repo/status.py index 9efe6f91fe..4775d2345c 100644 --- a/dvc/repo/status.py +++ b/dvc/repo/status.py @@ -74,7 +74,7 @@ def _cloud_status( { "bar": "deleted" } """ - import dvc.remote.base as cloud + import dvc.cache.base as cloud used = self.used_cache( targets, diff --git a/dvc/repo/tree.py b/dvc/repo/tree.py index 9d13209001..51ceacfbb9 100644 --- a/dvc/repo/tree.py +++ b/dvc/repo/tree.py @@ -4,8 +4,7 @@ from dvc.dvcfile import is_valid_filename from dvc.exceptions import OutputNotFoundError from dvc.path_info import PathInfo -from dvc.remote.base import RemoteActionNotImplemented -from dvc.tree.base import BaseTree +from dvc.tree.base import BaseTree, RemoteActionNotImplemented from dvc.utils import file_md5 from dvc.utils.fs import copy_fobj_to_file, makedirs diff --git a/dvc/stage/cache.py b/dvc/stage/cache.py index 2ba6c1cb35..0802c036d5 100644 --- a/dvc/stage/cache.py +++ b/dvc/stage/cache.py @@ -6,7 +6,7 @@ from funcy import first from voluptuous import Invalid -from dvc.remote.local import _log_exceptions +from dvc.cache.local import _log_exceptions from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA from dvc.utils import dict_sha256, relpath from dvc.utils.fs import makedirs diff --git a/dvc/tree/base.py b/dvc/tree/base.py index f860dcecdf..6f7db8debc 100644 --- a/dvc/tree/base.py +++ b/dvc/tree/base.py @@ -25,19 +25,6 @@ logger = logging.getLogger(__name__) -STATUS_OK = 1 -STATUS_MISSING = 2 -STATUS_NEW = 3 -STATUS_DELETED = 4 - -STATUS_MAP = { - # (local_exists, remote_exists) - (True, True): STATUS_OK, - (False, False): STATUS_MISSING, - (True, False): STATUS_NEW, - (False, True): STATUS_DELETED, -} - class RemoteCmdError(DvcException): def __init__(self, remote, cmd, ret, err): diff --git a/tests/func/remote/test_index.py b/tests/func/remote/test_index.py index 4949f8ed5c..d33b452b62 100644 --- a/tests/func/remote/test_index.py +++ b/tests/func/remote/test_index.py @@ -5,7 +5,8 @@ from dvc.exceptions import DownloadError, UploadError from dvc.remote.base import Remote from dvc.remote.index import RemoteIndex -from dvc.remote.local import LocalRemote, LocalTree +from dvc.remote.local import LocalRemote +from dvc.tree.local import LocalTree from dvc.utils.fs import remove diff --git a/tests/func/test_add.py b/tests/func/test_add.py index c1e770a020..2e7c48182b 100644 --- a/tests/func/test_add.py +++ b/tests/func/test_add.py @@ -21,10 +21,10 @@ ) from dvc.main import main from dvc.output.base import OutputAlreadyTrackedError, OutputIsStageFileError -from dvc.remote.local import LocalTree from dvc.repo import Repo as DvcRepo from dvc.stage import Stage from dvc.system import System +from dvc.tree.local import LocalTree from dvc.utils import LARGE_DIR_SIZE, file_md5, relpath from dvc.utils.fs import path_isin from dvc.utils.yaml import load_yaml diff --git a/tests/func/test_cache.py b/tests/func/test_cache.py index 14c9dac8cd..93a6cdaebf 100644 --- a/tests/func/test_cache.py +++ b/tests/func/test_cache.py @@ -5,8 +5,8 @@ import pytest from dvc.cache import Cache +from dvc.cache.base import DirCacheError from dvc.main import main -from dvc.remote.base import DirCacheError from dvc.utils import relpath from tests.basic_env import TestDir, TestDvc diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index bc929f81dc..04e73483a0 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -7,6 +7,7 @@ import pytest from mock import patch +from dvc.cache.base import CloudCache from dvc.dvcfile import DVC_FILE_SUFFIX, PIPELINE_FILE, Dvcfile from dvc.exceptions import ( CheckoutError, @@ -16,7 +17,7 @@ NoOutputOrStageError, ) from dvc.main import main -from dvc.remote.base import CloudCache, Remote +from dvc.remote.base import Remote from dvc.repo import Repo as DvcRepo from dvc.stage import Stage from dvc.stage.exceptions import StageFileDoesNotExistError diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 29a71b8bdb..33381c6eff 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -6,11 +6,11 @@ from flaky.flaky_decorator import flaky from dvc.cache import NamedCache +from dvc.cache.base import STATUS_DELETED, STATUS_NEW, STATUS_OK from dvc.external_repo import clean_repos from dvc.main import main -from dvc.remote.base import STATUS_DELETED, STATUS_NEW, STATUS_OK -from dvc.remote.local import LocalTree from dvc.stage.exceptions import StageNotFound +from dvc.tree.local import LocalTree from dvc.utils.fs import remove from dvc.utils.yaml import dump_yaml, load_yaml diff --git a/tests/func/test_external_repo.py b/tests/func/test_external_repo.py index f3f7f62831..d8c96fbcd8 100644 --- a/tests/func/test_external_repo.py +++ b/tests/func/test_external_repo.py @@ -4,8 +4,8 @@ from dvc.external_repo import external_repo from dvc.path_info import PathInfo -from dvc.remote.local import LocalTree from dvc.scm.git import Git +from dvc.tree.local import LocalTree from dvc.utils import relpath from dvc.utils.fs import remove diff --git a/tests/func/test_gc.py b/tests/func/test_gc.py index 480fd794eb..d19f9a1f46 100644 --- a/tests/func/test_gc.py +++ b/tests/func/test_gc.py @@ -8,8 +8,8 @@ from dvc.exceptions import CollectCacheError from dvc.main import main -from dvc.remote.local import LocalTree from dvc.repo import Repo as DvcRepo +from dvc.tree.local import LocalTree from dvc.utils.fs import remove from tests.basic_env import TestDir, TestDvcGit diff --git a/tests/func/test_import.py b/tests/func/test_import.py index d94f451492..4e38f70d89 100644 --- a/tests/func/test_import.py +++ b/tests/func/test_import.py @@ -234,7 +234,7 @@ def test_download_error_pulling_imported_stage(tmp_dir, dvc, erepo_dir): remove(dst_cache) with patch( - "dvc.remote.local.LocalTree._download", side_effect=Exception + "dvc.tree.local.LocalTree._download", side_effect=Exception ), pytest.raises(DownloadError): dvc.pull(["foo_imported.dvc"]) diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index a17095ccbd..af0db38084 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -15,10 +15,10 @@ ) from dvc.main import main from dvc.output.base import BaseOutput -from dvc.remote.local import LocalTree from dvc.stage import Stage from dvc.stage.exceptions import StageFileDoesNotExistError from dvc.system import System +from dvc.tree.local import LocalTree from dvc.utils import file_md5, relpath from dvc.utils.fs import remove from dvc.utils.yaml import dump_yaml, load_yaml diff --git a/tests/func/test_s3.py b/tests/func/test_s3.py index 287f68e66c..f3d8447300 100644 --- a/tests/func/test_s3.py +++ b/tests/func/test_s3.py @@ -5,7 +5,7 @@ import pytest from moto import mock_s3 -from dvc.remote.base import CloudCache +from dvc.cache.base import CloudCache from dvc.tree.s3 import S3Tree from tests.remotes import S3 diff --git a/tests/func/test_stage.py b/tests/func/test_stage.py index be68c96d84..a87446cdbe 100644 --- a/tests/func/test_stage.py +++ b/tests/func/test_stage.py @@ -6,10 +6,10 @@ from dvc.dvcfile import SingleStageFile from dvc.main import main from dvc.output.local import LocalOutput -from dvc.remote.local import LocalTree from dvc.repo import Repo from dvc.stage import PipelineStage, Stage from dvc.stage.exceptions import StageFileFormatError +from dvc.tree.local import LocalTree from dvc.utils.yaml import dump_yaml, load_yaml from tests.basic_env import TestDvc diff --git a/tests/unit/output/test_local.py b/tests/unit/output/test_local.py index 488db4b0f3..23ecdb9997 100644 --- a/tests/unit/output/test_local.py +++ b/tests/unit/output/test_local.py @@ -2,8 +2,8 @@ from mock import patch +from dvc.cache.local import LocalCache from dvc.output import LocalOutput -from dvc.remote.local import LocalCache from dvc.stage import Stage from dvc.utils import relpath from tests.basic_env import TestDvc diff --git a/tests/unit/remote/test_local.py b/tests/unit/remote/test_local.py index 3f1c934348..f24454b8b6 100644 --- a/tests/unit/remote/test_local.py +++ b/tests/unit/remote/test_local.py @@ -4,9 +4,9 @@ import pytest from dvc.cache import NamedCache +from dvc.cache.local import LocalCache from dvc.path_info import PathInfo from dvc.remote.index import RemoteIndexNoop -from dvc.remote.local import LocalCache from dvc.tree.local import LocalTree