From 3314bf5899d95353fe60bdad81ae3c9649c0bc05 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 6 Apr 2020 16:40:01 +0900 Subject: [PATCH 01/32] remote: locally index list of checksums available on cloud remotes --- dvc/remote/base.py | 41 ++++++++++++++--- dvc/remote/index.py | 78 +++++++++++++++++++++++++++++++++ dvc/remote/local.py | 10 +++++ dvc/repo/__init__.py | 2 + tests/unit/remote/test_base.py | 2 +- tests/unit/remote/test_index.py | 36 +++++++++++++++ 6 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 dvc/remote/index.py create mode 100644 tests/unit/remote/test_index.py diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 93e22cdc2e..5dcd8c8d00 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -1,4 +1,5 @@ import errno +import hashlib import itertools import json import logging @@ -23,6 +24,7 @@ from dvc.ignore import DvcIgnore from dvc.path_info import PathInfo, URLInfo, WindowsPathInfo from dvc.progress import Tqdm +from dvc.remote.index import RemoteIndex from dvc.remote.slow_link_detection import slow_link_guard from dvc.state import StateNoop from dvc.utils import tmp_fname @@ -111,6 +113,13 @@ def __init__(self, repo, config): self.cache_types = config.get("type") or copy(self.DEFAULT_CACHE_TYPES) self.cache_type_confirmed = False + url = config.get("url") + if url: + index_name = hashlib.md5(url.encode("utf-8")).hexdigest() + else: + index_name = None + self.index = RemoteIndex(self.repo, index_name) + @classmethod def get_missing_deps(cls): import importlib @@ -752,8 +761,11 @@ def gc(self, named_cache, jobs=None): path_info = self.checksum_to_path_info(checksum) if self.is_dir_checksum(checksum): self._remove_unpacked_dir(checksum) + self.index.remove(checksum) self.remove(path_info) removed = True + if removed: + self.index.save() return removed def is_protected(self, path_info): @@ -872,10 +884,21 @@ def cache_exists(self, checksums, jobs=None, name=None): # cache_exists() (see ssh, local) assert self.TRAVERSE_PREFIX_LEN >= 2 - if len(checksums) == 1 or not self.CAN_TRAVERSE: - return self._cache_object_exists(checksums, jobs, name) + checksums = set(checksums) + indexed_checksums = checksums & self.index.checksums + checksums -= indexed_checksums + logger.debug( + "Matched {} indexed checksums".format(len(indexed_checksums)) + ) + if not checksums: + return indexed_checksums - checksums = frozenset(checksums) + if len(checksums) == 1 or not self.CAN_TRAVERSE: + remote_checksums = self._cache_object_exists(checksums, jobs, name) + if remote_checksums: + self.index.update(remote_checksums) + self.index.save() + return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method remote_size, remote_checksums = self._estimate_cache_size( @@ -907,10 +930,16 @@ def cache_exists(self, checksums, jobs=None, name=None): logger.debug( "Querying {} checksums via traverse".format(len(checksums)) ) - remote_checksums = self._cache_checksums_traverse( - remote_size, remote_checksums, jobs, name + remote_checksums = set( + self._cache_checksums_traverse( + remote_size, remote_checksums, jobs, name + ) + ) + self.index.replace(remote_checksums) + self.index.save() + return list(indexed_checksums) + list( + checksums & set(remote_checksums) ) - return list(checksums & set(remote_checksums)) def _checksums_with_limit( self, limit, prefix=None, progress_callback=None diff --git a/dvc/remote/index.py b/dvc/remote/index.py new file mode 100644 index 0000000000..d829947b90 --- /dev/null +++ b/dvc/remote/index.py @@ -0,0 +1,78 @@ +import logging +import pathlib +import pickle + +logger = logging.getLogger(__name__) + + +class RemoteIndex(object): + """Class for locally indexing remote checksums. + + Args: + repo: repo for this remote + """ + + INDEX_SUFFIX = ".idx" + + def __init__(self, repo, name): + if repo and hasattr(repo, "index_dir") and name: + self.path = pathlib.Path(repo.index_dir).joinpath( + "{}{}".format(name, self.INDEX_SUFFIX) + ) + else: + self.path = None + self._checksums = set() + self.load() + + def __iter__(self): + return iter(self._checksums) + + @property + def checksums(self): + return self._checksums + + def load(self): + """(Re)load this index from disk.""" + if self.path and self.path.exists(): + try: + with open(self.path, "rb") as fd: + self._checksums = pickle.load(fd) + except IOError: + logger.error( + "Failed to load remote index from '{}'".format(self.path) + ) + + def save(self): + """Save this index to disk.""" + if self.path: + try: + with open(self.path, "wb") as fd: + pickle.dump(self._checksums, fd) + except IOError: + logger.error( + "Failed to save remote index to '{}'".format(self.path) + ) + + def invalidate(self): + """Invalidate this index (to force re-indexing later).""" + self._checksums.clear() + if self.path and self.path.exists(): + self.path.unlink() + + def remove(self, checksum): + if checksum in self._checksums: + self._checksums.remove(checksum) + + def replace(self, checksums): + """Replace the full contents of this index with ``checksums``. + + Changes to the index will not be written to disk. + """ + self._checksums = set(checksums) + + def update(self, *checksums): + """Update this index, adding elements from ``checksums``. + + Changes to the index will not be written to disk. + """ + self._checksums.update(*checksums) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 16b846db52..e09f21ebc6 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -464,8 +464,18 @@ def _process( if fails: if download: + remote.index.invalidate() raise DownloadError(fails) raise UploadError(fails) + elif not download: + pushed_checksums = list(map(self.path_to_checksum, plans[0])) + logger.debug( + "Adding {} pushed checksums to index".format( + len(pushed_checksums) + ) + ) + remote.index.update(pushed_checksums) + remote.index.save() return len(dir_plans[0]) + len(file_plans[0]) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 66e33d2e7d..a3745f6f9d 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -86,6 +86,8 @@ def __init__(self, root_dir=None): self.tmp_dir = os.path.join(self.dvc_dir, "tmp") makedirs(self.tmp_dir, exist_ok=True) + self.index_dir = os.path.join(self.dvc_dir, "index") + makedirs(self.index_dir, exist_ok=True) hardlink_lock = self.config["core"].get("hardlink_lock", False) self.lock = make_lock( diff --git a/tests/unit/remote/test_base.py b/tests/unit/remote/test_base.py index e8cafa3b26..7334594627 100644 --- a/tests/unit/remote/test_base.py +++ b/tests/unit/remote/test_base.py @@ -52,7 +52,7 @@ def test_cache_exists(object_exists, traverse, dvc): with mock.patch.object( remote, "cache_checksums", return_value=list(range(256)) ): - checksums = list(range(1000)) + checksums = set(range(1000)) remote.cache_exists(checksums) object_exists.assert_called_with(checksums, None, None) traverse.assert_not_called() diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py new file mode 100644 index 0000000000..967b7076a3 --- /dev/null +++ b/tests/unit/remote/test_index.py @@ -0,0 +1,36 @@ +import pickle +import os.path + +from dvc.remote.index import RemoteIndex +from tests.basic_env import TestDvc + + +class TestRemoteIndex(TestDvc): + def test_init(self): + index = RemoteIndex(self.dvc, "foo") + assert str(index.path) == os.path.join(self.dvc.index_dir, "foo.idx") + + def test_load(self): + checksums = {1, 2, 3} + path = os.path.join(self.dvc.index_dir, "foo.idx") + with open(path, "wb") as fd: + pickle.dump(checksums, fd) + index = RemoteIndex(self.dvc, "foo") + assert index.checksums == checksums + + def test_save(self): + index = RemoteIndex(self.dvc, "foo") + index.replace({4, 5, 6}) + index.save() + path = os.path.join(self.dvc.index_dir, "foo.idx") + with open(path, "rb") as fd: + checksums = pickle.load(fd) + assert index.checksums == checksums + + def test_invalidate(self): + index = RemoteIndex(self.dvc, "foo") + index.replace({1, 2, 3}) + index.save() + index.invalidate() + assert not index.checksums + assert not index.path.exists() From da8a9d62ed6451fc09fdc954c384c078028ea55f Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 7 Apr 2020 12:36:05 +0900 Subject: [PATCH 02/32] fix review issues - use pytest fixtures - use sha256 digest - s/fd/fobj/ - check more specific IO errors --- dvc/remote/base.py | 2 +- dvc/remote/index.py | 30 +++++++++------ dvc/repo/__init__.py | 3 +- tests/unit/remote/test_index.py | 65 +++++++++++++++++---------------- 4 files changed, 53 insertions(+), 47 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 5dcd8c8d00..fa63eed6e1 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -115,7 +115,7 @@ def __init__(self, repo, config): url = config.get("url") if url: - index_name = hashlib.md5(url.encode("utf-8")).hexdigest() + index_name = hashlib.sha256(url.encode("utf-8")).hexdigest() else: index_name = None self.index = RemoteIndex(self.repo, index_name) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index d829947b90..dc1a6d7dfa 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -9,13 +9,17 @@ class RemoteIndex(object): """Class for locally indexing remote checksums. Args: - repo: repo for this remote + repo: repo for this remote index. + name: name for this index. If name is provided, this index will be + loaded from and saved to ``.dvc/tmp/index/{name}.idx``. + If name is not provided (i.e. for local remotes), this index will + be kept in memory but not saved to disk. """ INDEX_SUFFIX = ".idx" - def __init__(self, repo, name): - if repo and hasattr(repo, "index_dir") and name: + def __init__(self, repo, name=None): + if name: self.path = pathlib.Path(repo.index_dir).joinpath( "{}{}".format(name, self.INDEX_SUFFIX) ) @@ -33,24 +37,26 @@ def checksums(self): def load(self): """(Re)load this index from disk.""" - if self.path and self.path.exists(): + if self.path and self.path.is_file(): try: - with open(self.path, "rb") as fd: - self._checksums = pickle.load(fd) - except IOError: + with open(self.path, "rb") as fobj: + self._checksums = pickle.load(fobj) + except PermissionError: logger.error( - "Failed to load remote index from '{}'".format(self.path) + "Insufficient permissions to read index file " + "'{}'".format(self.path) ) def save(self): """Save this index to disk.""" if self.path: try: - with open(self.path, "wb") as fd: - pickle.dump(self._checksums, fd) - except IOError: + with open(self.path, "wb") as fobj: + pickle.dump(self._checksums, fobj) + except PermissionError: logger.error( - "Failed to save remote index to '{}'".format(self.path) + "Insufficient permissions to write index file " + "'{}'".format(self.path) ) def invalidate(self): diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index a3745f6f9d..c8d6631863 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -85,8 +85,7 @@ def __init__(self, root_dir=None): self.tree = WorkingTree(self.root_dir) self.tmp_dir = os.path.join(self.dvc_dir, "tmp") - makedirs(self.tmp_dir, exist_ok=True) - self.index_dir = os.path.join(self.dvc_dir, "index") + self.index_dir = os.path.join(self.tmp_dir, "index") makedirs(self.index_dir, exist_ok=True) hardlink_lock = self.config["core"].get("hardlink_lock", False) diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 967b7076a3..8799876ee1 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -2,35 +2,36 @@ import os.path from dvc.remote.index import RemoteIndex -from tests.basic_env import TestDvc - - -class TestRemoteIndex(TestDvc): - def test_init(self): - index = RemoteIndex(self.dvc, "foo") - assert str(index.path) == os.path.join(self.dvc.index_dir, "foo.idx") - - def test_load(self): - checksums = {1, 2, 3} - path = os.path.join(self.dvc.index_dir, "foo.idx") - with open(path, "wb") as fd: - pickle.dump(checksums, fd) - index = RemoteIndex(self.dvc, "foo") - assert index.checksums == checksums - - def test_save(self): - index = RemoteIndex(self.dvc, "foo") - index.replace({4, 5, 6}) - index.save() - path = os.path.join(self.dvc.index_dir, "foo.idx") - with open(path, "rb") as fd: - checksums = pickle.load(fd) - assert index.checksums == checksums - - def test_invalidate(self): - index = RemoteIndex(self.dvc, "foo") - index.replace({1, 2, 3}) - index.save() - index.invalidate() - assert not index.checksums - assert not index.path.exists() + + +def test_init(dvc): + index = RemoteIndex(dvc, "foo") + assert str(index.path) == os.path.join(dvc.index_dir, "foo.idx") + + +def test_load(dvc): + checksums = {1, 2, 3} + path = os.path.join(dvc.index_dir, "foo.idx") + with open(path, "wb") as fd: + pickle.dump(checksums, fd) + index = RemoteIndex(dvc, "foo") + assert index.checksums == checksums + + +def test_save(dvc): + index = RemoteIndex(dvc, "foo") + index.replace({4, 5, 6}) + index.save() + path = os.path.join(dvc.index_dir, "foo.idx") + with open(path, "rb") as fd: + checksums = pickle.load(fd) + assert index.checksums == checksums + + +def test_invalidate(dvc): + index = RemoteIndex(dvc, "foo") + index.replace({1, 2, 3}) + index.save() + index.invalidate() + assert not index.checksums + assert not index.path.exists() From aac712a99190a7460d5f724df0116a45fb67305d Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 15 Apr 2020 13:57:45 +0900 Subject: [PATCH 03/32] remote: use .dir checksums to validate index --- dvc/remote/index.py | 12 ++++++------ dvc/remote/local.py | 16 +++++++++++++++- tests/unit/remote/test_index.py | 2 +- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index dc1a6d7dfa..0a484c3df7 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -1,5 +1,5 @@ import logging -import pathlib +import os import pickle logger = logging.getLogger(__name__) @@ -20,8 +20,8 @@ class RemoteIndex(object): def __init__(self, repo, name=None): if name: - self.path = pathlib.Path(repo.index_dir).joinpath( - "{}{}".format(name, self.INDEX_SUFFIX) + self.path = os.path.join( + repo.index_dir, "{}{}".format(name, self.INDEX_SUFFIX) ) else: self.path = None @@ -37,7 +37,7 @@ def checksums(self): def load(self): """(Re)load this index from disk.""" - if self.path and self.path.is_file(): + if self.path and os.path.isfile(self.path): try: with open(self.path, "rb") as fobj: self._checksums = pickle.load(fobj) @@ -62,8 +62,8 @@ def save(self): def invalidate(self): """Invalidate this index (to force re-indexing later).""" self._checksums.clear() - if self.path and self.path.exists(): - self.path.unlink() + if self.path and os.path.isfile(self.path): + os.unlink(self.path) def remove(self, checksum): if checksum in self._checksums: diff --git a/dvc/remote/local.py b/dvc/remote/local.py index e09f21ebc6..a021749ff1 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -319,6 +319,18 @@ def _status( remote_exists.add(dir_checksum) md5s.difference_update(file_checksums) remote_exists.update(file_checksums) + # Validate our index by verifying all indexed .dir checksums + # still exist on the remote + missing_dirs = ( + remote.index.checksums.intersection(dir_md5s) + - remote_exists + ) + if missing_dirs: + logger.debug( + "Remote cache missing indexed .dir checksums '{}', " + "clearing local index".format(", ".join(missing_dirs)) + ) + remote.index.invalidate() if md5s: remote_exists.update( remote.cache_exists( @@ -468,7 +480,9 @@ def _process( raise DownloadError(fails) raise UploadError(fails) elif not download: - pushed_checksums = list(map(self.path_to_checksum, plans[0])) + pushed_checksums = list( + map(self.path_to_checksum, dir_plans[0] + file_plans[0]) + ) logger.debug( "Adding {} pushed checksums to index".format( len(pushed_checksums) diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 8799876ee1..3f6e9faa04 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -34,4 +34,4 @@ def test_invalidate(dvc): index.save() index.invalidate() assert not index.checksums - assert not index.path.exists() + assert not os.path.exists(index.path) From f586eca63f127e443a57b96967ef8a71e2fc0532 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 15 Apr 2020 14:09:06 +0900 Subject: [PATCH 04/32] command: add --drop-index option for push/pull/status --- dvc/command/data_sync.py | 20 +++++++++++++++++++ dvc/command/status.py | 1 + dvc/data_cloud.py | 43 +++++++++++++++++++++++++++++++++++----- dvc/remote/local.py | 28 ++++++++++++++++++++++++-- dvc/repo/fetch.py | 7 ++++++- dvc/repo/pull.py | 2 ++ dvc/repo/push.py | 3 ++- dvc/repo/status.py | 7 ++++++- 8 files changed, 101 insertions(+), 10 deletions(-) diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index 3091fc915a..2efc80d891 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -30,6 +30,7 @@ def run(self): with_deps=self.args.with_deps, force=self.args.force, recursive=self.args.recursive, + drop_index=self.args.drop_index, ) log_summary(stats) except (CheckoutError, DvcException) as exc: @@ -53,6 +54,7 @@ def run(self): all_commits=self.args.all_commits, with_deps=self.args.with_deps, recursive=self.args.recursive, + drop_index=self.args.drop_index, ) except DvcException: logger.exception("failed to push data to the cloud") @@ -162,6 +164,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Pull cache for subdirectories of the specified directory.", ) + pull_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the specified remote.", + ) pull_parser.set_defaults(func=CmdDataPull) # Push @@ -211,6 +219,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Push cache for subdirectories of specified directory.", ) + push_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the remote.", + ) push_parser.set_defaults(func=CmdDataPush) # Fetch @@ -334,4 +348,10 @@ def add_parser(subparsers, _parent_parser): default=False, help="Show status for all dependencies of the specified target.", ) + status_parser.add_argument( + "--drop-index", + action="store_true", + default=False, + help="Drop local index for the remote.", + ) status_parser.set_defaults(func=CmdDataStatus) diff --git a/dvc/command/status.py b/dvc/command/status.py index a441f71369..4585fd2ef4 100644 --- a/dvc/command/status.py +++ b/dvc/command/status.py @@ -49,6 +49,7 @@ def run(self): all_tags=self.args.all_tags, all_commits=self.args.all_commits, with_deps=self.args.with_deps, + drop_index=self.args.drop_index, ) if st: if self.args.quiet: diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index 6fe2683ee2..b14c38e056 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -48,7 +48,14 @@ def get_remote(self, remote=None, command=""): def _init_remote(self, remote): return Remote(self.repo, name=remote) - def push(self, cache, jobs=None, remote=None, show_checksums=False): + def push( + self, + cache, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Push data items in a cloud-agnostic way. Args: @@ -58,15 +65,24 @@ def push(self, cache, jobs=None, remote=None, show_checksums=False): By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ return self.repo.cache.local.push( cache, jobs=jobs, remote=self.get_remote(remote, "push"), show_checksums=show_checksums, + drop_index=drop_index, ) - def pull(self, cache, jobs=None, remote=None, show_checksums=False): + def pull( + self, + cache, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Pull data items in a cloud-agnostic way. Args: @@ -76,10 +92,15 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False): By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( - cache, jobs=jobs, remote=remote, show_checksums=show_checksums + cache, + jobs=jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) if not remote.verify: @@ -97,7 +118,14 @@ def _save_pulled_checksums(self, cache): # (see `RemoteBASE.download()`) self.repo.state.save(cache_file, checksum) - def status(self, cache, jobs=None, remote=None, show_checksums=False): + def status( + self, + cache, + jobs=None, + remote=None, + show_checksums=False, + drop_index=False, + ): """Check status of data items in a cloud-agnostic way. Args: @@ -108,8 +136,13 @@ def status(self, cache, jobs=None, remote=None, show_checksums=False): is used. show_checksums (bool): show checksums instead of file names in information messages. + drop_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( - cache, jobs=jobs, remote=remote, show_checksums=show_checksums + cache, + jobs=jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index a021749ff1..aab0c7fea5 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -256,6 +256,7 @@ def status( jobs=None, show_checksums=False, download=False, + drop_index=False, ): # Return flattened dict containing all status info dir_status, file_status, _ = self._status( @@ -264,6 +265,7 @@ def status( jobs=jobs, show_checksums=show_checksums, download=download, + drop_index=drop_index, ) return dict(dir_status, **file_status) @@ -274,6 +276,7 @@ def _status( jobs=None, show_checksums=False, download=False, + drop_index=False, ): """Return a tuple of (dir_status_info, file_status_info, dir_mapping). @@ -301,6 +304,9 @@ def _status( else: logger.debug("Collecting information from remote cache...") remote_exists = set() + if drop_index: + logger.debug("Clearing local index for remote cache") + remote.index.invalidate() dir_md5s = set(named_cache.dir_keys(self.scheme)) if dir_md5s: # If .dir checksum exists on the remote, assume directory @@ -404,6 +410,7 @@ def _process( jobs=None, show_checksums=False, download=False, + drop_index=False, ): logger.debug( "Preparing to {} '{}'".format( @@ -432,6 +439,7 @@ def _process( jobs=jobs, show_checksums=show_checksums, download=download, + drop_index=drop_index, ) dir_plans = self._get_plans(download, remote, dir_status, status) @@ -509,22 +517,38 @@ def _dir_upload(func, futures, from_info, to_info, name): return 1 return func(from_info, to_info, name) - def push(self, named_cache, remote, jobs=None, show_checksums=False): + def push( + self, + named_cache, + remote, + jobs=None, + show_checksums=False, + drop_index=False, + ): return self._process( named_cache, remote, jobs=jobs, show_checksums=show_checksums, download=False, + drop_index=drop_index, ) - def pull(self, named_cache, remote, jobs=None, show_checksums=False): + def pull( + self, + named_cache, + remote, + jobs=None, + show_checksums=False, + drop_index=False, + ): return self._process( named_cache, remote, jobs=jobs, show_checksums=show_checksums, download=True, + drop_index=drop_index, ) @staticmethod diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index 888f552da0..467870f8fa 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -20,6 +20,7 @@ def _fetch( all_tags=False, recursive=False, all_commits=False, + drop_index=False, ): """Download data items from a cloud and imported repositories @@ -50,7 +51,11 @@ def _fetch( try: downloaded += self.cloud.pull( - used, jobs, remote=remote, show_checksums=show_checksums + used, + jobs, + remote=remote, + show_checksums=show_checksums, + drop_index=drop_index, ) except NoRemoteError: if not used.external and used["local"]: diff --git a/dvc/repo/pull.py b/dvc/repo/pull.py index 4623f6a4ec..36d99f45b6 100644 --- a/dvc/repo/pull.py +++ b/dvc/repo/pull.py @@ -18,6 +18,7 @@ def pull( force=False, recursive=False, all_commits=False, + drop_index=False, ): processed_files_count = self._fetch( targets, @@ -28,6 +29,7 @@ def pull( all_commits=all_commits, with_deps=with_deps, recursive=recursive, + drop_index=drop_index, ) stats = self._checkout( targets=targets, with_deps=with_deps, force=force, recursive=recursive diff --git a/dvc/repo/push.py b/dvc/repo/push.py index 43946d37c0..ed515dd4f5 100644 --- a/dvc/repo/push.py +++ b/dvc/repo/push.py @@ -12,6 +12,7 @@ def push( all_tags=False, recursive=False, all_commits=False, + drop_index=False, ): used = self.used_cache( targets, @@ -24,4 +25,4 @@ def push( jobs=jobs, recursive=recursive, ) - return self.cloud.push(used, jobs, remote=remote) + return self.cloud.push(used, jobs, remote=remote, drop_index=drop_index) diff --git a/dvc/repo/status.py b/dvc/repo/status.py index 40780c66ec..4d48a81ba2 100644 --- a/dvc/repo/status.py +++ b/dvc/repo/status.py @@ -45,6 +45,7 @@ def _cloud_status( with_deps=False, all_tags=False, all_commits=False, + drop_index=False, ): """Returns a dictionary with the files that are new or deleted. @@ -86,7 +87,9 @@ def _cloud_status( ) ret = {} - status_info = self.cloud.status(used, jobs, remote=remote) + status_info = self.cloud.status( + used, jobs, remote=remote, drop_index=drop_index + ) for info in status_info.values(): name = info["name"] status = info["status"] @@ -111,6 +114,7 @@ def status( with_deps=False, all_tags=False, all_commits=False, + drop_index=False, ): if cloud or remote: return _cloud_status( @@ -122,6 +126,7 @@ def status( remote=remote, all_tags=all_tags, all_commits=all_commits, + drop_index=drop_index, ) ignored = list( From 581dd763e4869bb78517c730720f4154dbc5e737 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 15 Apr 2020 14:12:08 +0900 Subject: [PATCH 05/32] tests: --drop-index unit tests --- tests/unit/command/test_data_sync.py | 4 ++++ tests/unit/command/test_status.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/tests/unit/command/test_data_sync.py b/tests/unit/command/test_data_sync.py index 41d464dbd9..2bc083b70a 100644 --- a/tests/unit/command/test_data_sync.py +++ b/tests/unit/command/test_data_sync.py @@ -54,6 +54,7 @@ def test_pull(mocker): "--with-deps", "--force", "--recursive", + "--drop-index", ] ) assert cli_args.func == CmdDataPull @@ -73,6 +74,7 @@ def test_pull(mocker): with_deps=True, force=True, recursive=True, + drop_index=True, ) @@ -91,6 +93,7 @@ def test_push(mocker): "--all-commits", "--with-deps", "--recursive", + "--drop-index", ] ) assert cli_args.func == CmdDataPush @@ -109,4 +112,5 @@ def test_push(mocker): all_commits=True, with_deps=True, recursive=True, + drop_index=True, ) diff --git a/tests/unit/command/test_status.py b/tests/unit/command/test_status.py index 7b8ea33248..2663a6bc7c 100644 --- a/tests/unit/command/test_status.py +++ b/tests/unit/command/test_status.py @@ -17,6 +17,7 @@ def test_cloud_status(mocker): "--all-tags", "--all-commits", "--with-deps", + "--drop-index", ] ) assert cli_args.func == CmdDataStatus @@ -35,4 +36,5 @@ def test_cloud_status(mocker): all_tags=True, all_commits=True, with_deps=True, + drop_index=True, ) From cc6fe9f1008088a8446b09119530d81f488c471f Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 15 Apr 2020 14:22:54 +0900 Subject: [PATCH 06/32] remote: make index threadsafe --- dvc/remote/index.py | 47 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 0a484c3df7..2a0c8d3582 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -1,6 +1,7 @@ import logging import os import pickle +import threading logger = logging.getLogger(__name__) @@ -25,7 +26,9 @@ def __init__(self, repo, name=None): ) else: self.path = None + self.lock = threading.Lock() self._checksums = set() + self.modified = False self.load() def __iter__(self): @@ -38,47 +41,75 @@ def checksums(self): def load(self): """(Re)load this index from disk.""" if self.path and os.path.isfile(self.path): + self.lock.acquire() try: with open(self.path, "rb") as fobj: self._checksums = pickle.load(fobj) - except PermissionError: + self.modified = False + except IOError as exc: logger.error( - "Insufficient permissions to read index file " - "'{}'".format(self.path) + "Failed to load remote index file '{}'. " + "Remote will be re-indexed: '{}'".format(self.path, exc) ) + finally: + self.lock.release() def save(self): """Save this index to disk.""" - if self.path: + if self.path and self.modified: + self.lock.acquire() try: with open(self.path, "wb") as fobj: pickle.dump(self._checksums, fobj) - except PermissionError: + self.modified = False + except IOError as exc: logger.error( - "Insufficient permissions to write index file " - "'{}'".format(self.path) + "Failed to save remote index file '{}': {}".format( + self.path, exc + ) ) + finally: + self.lock.release() def invalidate(self): """Invalidate this index (to force re-indexing later).""" + self.lock.acquire() self._checksums.clear() + self.modified = True if self.path and os.path.isfile(self.path): - os.unlink(self.path) + try: + os.unlink(self.path) + except IOError as exc: + logger.error( + "Failed to remove remote index file '{}': {}".format( + self.path, exc + ) + ) + self.lock.release() def remove(self, checksum): if checksum in self._checksums: + self.lock.acquire() self._checksums.remove(checksum) + self.modified = True + self.lock.release() def replace(self, checksums): """Replace the full contents of this index with ``checksums``. Changes to the index will not be written to disk. """ + self.lock.acquire() self._checksums = set(checksums) + self.modified = True + self.lock.release() def update(self, *checksums): """Update this index, adding elements from ``checksums``. Changes to the index will not be written to disk. """ + self.lock.acquire() self._checksums.update(*checksums) + self.modified = True + self.lock.release() From 68c6a9cc7c89b577322887cc0d4c842388207830 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 15 Apr 2020 14:35:01 +0900 Subject: [PATCH 07/32] remote: force re-index after gc --- dvc/remote/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index fa63eed6e1..568dc5870c 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -761,10 +761,10 @@ def gc(self, named_cache, jobs=None): path_info = self.checksum_to_path_info(checksum) if self.is_dir_checksum(checksum): self._remove_unpacked_dir(checksum) - self.index.remove(checksum) self.remove(path_info) removed = True if removed: + self.index.invalidate() self.index.save() return removed From fbd76a3c6c22d78be5ef3eb09aa9696c3bc9b204 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 15 Apr 2020 14:43:29 +0900 Subject: [PATCH 08/32] only save/dump index once per command --- dvc/remote/base.py | 2 -- dvc/remote/local.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 568dc5870c..780030b2b3 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -897,7 +897,6 @@ def cache_exists(self, checksums, jobs=None, name=None): remote_checksums = self._cache_object_exists(checksums, jobs, name) if remote_checksums: self.index.update(remote_checksums) - self.index.save() return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method @@ -936,7 +935,6 @@ def cache_exists(self, checksums, jobs=None, name=None): ) ) self.index.replace(remote_checksums) - self.index.save() return list(indexed_checksums) + list( checksums & set(remote_checksums) ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index aab0c7fea5..671435529f 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -267,6 +267,7 @@ def status( download=download, drop_index=drop_index, ) + remote.index.save() return dict(dir_status, **file_status) def _status( From adc50bd195e52a00d147c7e532b193124f7ebe06 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 15 Apr 2020 14:48:25 +0900 Subject: [PATCH 09/32] remote: add helper index functions update_all/replace_all --- dvc/remote/base.py | 4 +-- dvc/remote/index.py | 40 ++++++++++++++++-------- dvc/remote/local.py | 11 ++++--- tests/unit/remote/test_index.py | 55 +++++++++++++++++++++++++++++++-- 4 files changed, 89 insertions(+), 21 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 780030b2b3..b40a0e34cb 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -896,7 +896,7 @@ def cache_exists(self, checksums, jobs=None, name=None): if len(checksums) == 1 or not self.CAN_TRAVERSE: remote_checksums = self._cache_object_exists(checksums, jobs, name) if remote_checksums: - self.index.update(remote_checksums) + self.index.update_all(remote_checksums) return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method @@ -934,7 +934,7 @@ def cache_exists(self, checksums, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) ) - self.index.replace(remote_checksums) + self.index.replace_all(remote_checksums) return list(indexed_checksums) + list( checksums & set(remote_checksums) ) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 2a0c8d3582..ce035edbb4 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -3,6 +3,8 @@ import pickle import threading +from funcy import split + logger = logging.getLogger(__name__) @@ -15,6 +17,7 @@ class RemoteIndex(object): loaded from and saved to ``.dvc/tmp/index/{name}.idx``. If name is not provided (i.e. for local remotes), this index will be kept in memory but not saved to disk. + dir_suffix: suffix used for naming directory checksums """ INDEX_SUFFIX = ".idx" @@ -87,29 +90,40 @@ def invalidate(self): ) self.lock.release() - def remove(self, checksum): - if checksum in self._checksums: - self.lock.acquire() - self._checksums.remove(checksum) - self.modified = True - self.lock.release() - - def replace(self, checksums): - """Replace the full contents of this index with ``checksums``. + def replace(self, dir_checksums, file_checksums): + """Replace the contents of this index with the specified checksums. Changes to the index will not be written to disk. """ self.lock.acquire() - self._checksums = set(checksums) + self._dir_checksums = set(dir_checksums) + self._file_checksums = set(file_checksums) self.modified = True self.lock.release() - def update(self, *checksums): - """Update this index, adding elements from ``checksums``. + def replace_all(self, *checksums): + """Replace the contents of this index with the specified checksums. + + Changes to the index will not be written to disk. + """ + dir_checksums, file_checksums = split(self.is_dir_checksum, *checksums) + self.replace(dir_checksums, file_checksums) + + def update(self, dir_checksums, file_checksums): + """Update this index, adding the specified checksums. Changes to the index will not be written to disk. """ self.lock.acquire() - self._checksums.update(*checksums) + self._dir_checksums.update(dir_checksums) + self._file_checksums.update(file_checksums) self.modified = True self.lock.release() + + def update_all(self, *checksums): + """Update this index, adding the specified checksums. + + Changes to the index will not be written to disk. + """ + dir_checksums, file_checksums = split(self.is_dir_checksum, *checksums) + self.update(dir_checksums, file_checksums) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 671435529f..82873db3dd 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -489,15 +489,18 @@ def _process( raise DownloadError(fails) raise UploadError(fails) elif not download: - pushed_checksums = list( - map(self.path_to_checksum, dir_plans[0] + file_plans[0]) + pushed_dir_checksums = list( + map(self.path_to_checksum, dir_plans[0]) + ) + pushed_file_checksums = list( + map(self.path_to_checksum, file_plans[0]) ) logger.debug( "Adding {} pushed checksums to index".format( - len(pushed_checksums) + len(pushed_dir_checksums) + len(pushed_file_checksums) ) ) - remote.index.update(pushed_checksums) + remote.index.update(pushed_dir_checksums, pushed_file_checksums) remote.index.save() return len(dir_plans[0]) + len(file_plans[0]) diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 3f6e9faa04..ab6814fe13 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -20,7 +20,9 @@ def test_load(dvc): def test_save(dvc): index = RemoteIndex(dvc, "foo") - index.replace({4, 5, 6}) + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(expected_dir, expected_file) index.save() path = os.path.join(dvc.index_dir, "foo.idx") with open(path, "rb") as fd: @@ -30,8 +32,57 @@ def test_save(dvc): def test_invalidate(dvc): index = RemoteIndex(dvc, "foo") - index.replace({1, 2, 3}) + index.replace( + ["0123456789abcdef0123456789abcdef.dir"], + ["fedcba9876543210fedcba9876543210"], + ) index.save() index.invalidate() assert not index.checksums assert not os.path.exists(index.path) + + +def test_replace(dvc): + index = RemoteIndex(dvc, "foo") + index._dir_checksums = set() + index._file_checksums = set() + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(expected_dir, expected_file) + assert index._dir_checksums == expected_dir + assert index._file_checksums == expected_file + + +def test_replace_all(dvc): + index = RemoteIndex(dvc, "foo") + index._dir_checksums = set() + index._file_checksums = set() + expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace_all(expected_dir | expected_file) + assert index._dir_checksums == expected_dir + assert index._file_checksums == expected_file + + +def test_update(dvc): + index = RemoteIndex(dvc, "foo") + initial_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + initial_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(initial_dir, initial_file) + expected_dir = frozenset(["1123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543211"]) + index.update(expected_dir, expected_file) + assert index._dir_checksums == initial_dir | expected_dir + assert index._file_checksums == initial_file | expected_file + + +def test_update_all(dvc): + index = RemoteIndex(dvc, "foo") + initial_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) + initial_file = frozenset(["fedcba9876543210fedcba9876543210"]) + index.replace(initial_dir, initial_file) + expected_dir = frozenset(["1123456789abcdef0123456789abcdef.dir"]) + expected_file = frozenset(["fedcba9876543210fedcba9876543211"]) + index.update_all(expected_dir | expected_file) + assert index._dir_checksums == initial_dir | expected_dir + assert index._file_checksums == initial_file | expected_file From 7c194bde90c85906dea4777776170f6d80d326d0 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 15 Apr 2020 17:54:39 +0900 Subject: [PATCH 10/32] remote: store index as sqlite3 database --- dvc/remote/base.py | 27 +++- dvc/remote/index.py | 271 ++++++++++++++++++++++---------- dvc/remote/local.py | 29 ++-- tests/unit/remote/test_index.py | 111 ++++++------- 4 files changed, 272 insertions(+), 166 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index b40a0e34cb..82a4d8d94d 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -7,7 +7,7 @@ from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor from copy import copy -from functools import partial +from functools import partial, wraps from multiprocessing import cpu_count from operator import itemgetter @@ -24,7 +24,7 @@ from dvc.ignore import DvcIgnore from dvc.path_info import PathInfo, URLInfo, WindowsPathInfo from dvc.progress import Tqdm -from dvc.remote.index import RemoteIndex +from dvc.remote.index import RemoteIndex, RemoteIndexNoop from dvc.remote.slow_link_detection import slow_link_guard from dvc.state import StateNoop from dvc.utils import tmp_fname @@ -72,11 +72,24 @@ def __init__(self, checksum): ) +def index_locked(f): + @wraps(f) + def wrapper(remote_obj, *args, **kwargs): + remote = kwargs.get("remote") + if remote: + with remote.index: + return f(remote_obj, *args, **kwargs) + return f(remote_obj, *args, **kwargs) + + return wrapper + + class RemoteBASE(object): scheme = "base" path_cls = URLInfo REQUIRES = {} JOBS = 4 * cpu_count() + INDEX_CLS = RemoteIndex PARAM_RELPATH = "relpath" CHECKSUM_DIR_SUFFIX = ".dir" @@ -116,9 +129,11 @@ def __init__(self, repo, config): url = config.get("url") if url: index_name = hashlib.sha256(url.encode("utf-8")).hexdigest() + self.index = self.INDEX_CLS( + self.repo, index_name, dir_suffix=self.CHECKSUM_DIR_SUFFIX + ) else: - index_name = None - self.index = RemoteIndex(self.repo, index_name) + self.index = RemoteIndexNoop() @classmethod def get_missing_deps(cls): @@ -743,6 +758,7 @@ def all(self, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) + @index_locked def gc(self, named_cache, jobs=None): used = self.extract_used_local_checksums(named_cache) @@ -765,7 +781,6 @@ def gc(self, named_cache, jobs=None): removed = True if removed: self.index.invalidate() - self.index.save() return removed def is_protected(self, path_info): @@ -885,7 +900,7 @@ def cache_exists(self, checksums, jobs=None, name=None): assert self.TRAVERSE_PREFIX_LEN >= 2 checksums = set(checksums) - indexed_checksums = checksums & self.index.checksums + indexed_checksums = checksums.intersection(self.index.checksums()) checksums -= indexed_checksums logger.debug( "Matched {} indexed checksums".format(len(indexed_checksums)) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index ce035edbb4..ae282d123a 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -1,129 +1,228 @@ import logging import os -import pickle +import sqlite3 import threading -from funcy import split +from dvc.state import _connect_sqlite logger = logging.getLogger(__name__) -class RemoteIndex(object): - """Class for locally indexing remote checksums. +class RemoteIndexNoop: + """No-op class for remotes which are not indexed (i.e. local).""" + + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + pass + + def __exit__(self, typ, value, tbck): + pass + + def checksums(self): + return [] + + def dir_checksums(self): + return [] + + def load(self): + pass + + def dump(self): + pass + + def invalidate(self): + pass + + def replace(self, *args): + pass + + def replace_all(self, *args): + pass + + def update(self, *args): + pass + + def update_all(self, *args): + pass + + +class RemoteIndex: + """Class for indexing remote checksums in a sqlite3 database. Args: repo: repo for this remote index. - name: name for this index. If name is provided, this index will be - loaded from and saved to ``.dvc/tmp/index/{name}.idx``. - If name is not provided (i.e. for local remotes), this index will - be kept in memory but not saved to disk. + name: name for this index. Index db will be loaded from and saved to + ``.dvc/tmp/index/{name}.idx``. dir_suffix: suffix used for naming directory checksums """ INDEX_SUFFIX = ".idx" - - def __init__(self, repo, name=None): - if name: - self.path = os.path.join( - repo.index_dir, "{}{}".format(name, self.INDEX_SUFFIX) - ) - else: - self.path = None - self.lock = threading.Lock() - self._checksums = set() + VERSION = 1 + INDEX_TABLE = "remote_index" + INDEX_TABLE_LAYOUT = "checksum TEXT PRIMARY KEY, " "dir INTEGER NOT NULL" + + def __init__(self, repo, name, dir_suffix=".dir"): + self.path = os.path.join( + repo.index_dir, "{}{}".format(name, self.INDEX_SUFFIX) + ) + + self.dir_suffix = dir_suffix + self.database = None + self.cursor = None self.modified = False - self.load() + self.lock = threading.Lock() def __iter__(self): - return iter(self._checksums) + cmd = "SELECT checksum FROM {}".format(self.INDEX_TABLE) + for (checksum,) in self._execute(cmd): + yield checksum - @property - def checksums(self): - return self._checksums + def __enter__(self): + self.lock.acquire() + self.load() - def load(self): - """(Re)load this index from disk.""" - if self.path and os.path.isfile(self.path): - self.lock.acquire() - try: - with open(self.path, "rb") as fobj: - self._checksums = pickle.load(fobj) - self.modified = False - except IOError as exc: - logger.error( - "Failed to load remote index file '{}'. " - "Remote will be re-indexed: '{}'".format(self.path, exc) - ) - finally: - self.lock.release() + def __exit__(self, typ, value, tbck): + self.dump() + self.lock.release() - def save(self): - """Save this index to disk.""" - if self.path and self.modified: - self.lock.acquire() - try: - with open(self.path, "wb") as fobj: - pickle.dump(self._checksums, fobj) - self.modified = False - except IOError as exc: + def checksums(self): + """Iterate over checksums stored in the index.""" + return iter(self) + + def dir_checksums(self): + """Iterate over .dir checksums stored in the index.""" + cmd = "SELECT checksum FROM {} WHERE dir = 1".format(self.INDEX_TABLE) + for (checksum,) in self._execute(cmd): + yield checksum + + def is_dir_checksum(self, checksum): + return checksum.endswith(self.dir_suffix) + + def _execute(self, cmd, parameters=()): + logger.debug(cmd) + return self.cursor.execute(cmd, parameters) + + def _executemany(self, cmd, seq_of_parameters): + logger.debug(cmd) + return self.cursor.executemany(cmd, seq_of_parameters) + + def _prepare_db(self, empty=False): + if not empty: + cmd = "PRAGMA user_version;" + self._execute(cmd) + ret = self.cursor.fetchall() + assert len(ret) == 1 + assert len(ret[0]) == 1 + assert isinstance(ret[0][0], int) + version = ret[0][0] + + if version != self.VERSION: logger.error( - "Failed to save remote index file '{}': {}".format( - self.path, exc + "Index file version '{}' will be reformatted " + "to the current version '{}'.".format( + version, self.VERSION, ) ) - finally: - self.lock.release() + cmd = "DROP TABLE IF EXISTS {};" + self._execute(cmd.format(self.INDEX_TABLE)) + + cmd = "CREATE TABLE IF NOT EXISTS {} ({})" + self._execute(cmd.format(self.INDEX_TABLE, self.INDEX_TABLE_LAYOUT)) + + cmd = "PRAGMA user_version = {};" + self._execute(cmd.format(self.VERSION)) + + def load(self): + """(Re)load this index database.""" + retries = 1 + while True: + assert self.database is None + assert self.cursor is None + + empty = not os.path.isfile(self.path) + self.database = _connect_sqlite(self.path, {"nolock": 1}) + self.cursor = self.database.cursor() - def invalidate(self): - """Invalidate this index (to force re-indexing later).""" - self.lock.acquire() - self._checksums.clear() - self.modified = True - if self.path and os.path.isfile(self.path): try: - os.unlink(self.path) - except IOError as exc: - logger.error( - "Failed to remove remote index file '{}': {}".format( - self.path, exc - ) - ) - self.lock.release() + self._prepare_db(empty=empty) + return + except sqlite3.DatabaseError: + self.cursor.close() + self.database.close() + self.database = None + self.cursor = None + if retries > 0: + os.unlink(self.path) + retries -= 1 + else: + raise + + def dump(self): + """Save this index database.""" + assert self.database is not None + + self.database.commit() + self.cursor.close() + self.database.close() + self.database = None + self.cursor = None + + def _clear(self): + cmd = "DELETE FROM {}".format(self.INDEX_TABLE) + self._execute(cmd) + + def invalidate(self): + """Invalidate this index (to force re-indexing later). + + Changes to the index will not committed until dump() is called. + """ + self._clear() def replace(self, dir_checksums, file_checksums): """Replace the contents of this index with the specified checksums. - Changes to the index will not be written to disk. + Changes to the index will not committed until dump() is called. """ - self.lock.acquire() - self._dir_checksums = set(dir_checksums) - self._file_checksums = set(file_checksums) - self.modified = True - self.lock.release() + self._clear() + self.update(dir_checksums, file_checksums) - def replace_all(self, *checksums): + def replace_all(self, checksums): """Replace the contents of this index with the specified checksums. - Changes to the index will not be written to disk. + Changes to the index will not committed until dump() is called. """ - dir_checksums, file_checksums = split(self.is_dir_checksum, *checksums) - self.replace(dir_checksums, file_checksums) + self._clear() + self.update_all(checksums) def update(self, dir_checksums, file_checksums): """Update this index, adding the specified checksums. - Changes to the index will not be written to disk. + Changes to the index will not committed until dump() is called. """ - self.lock.acquire() - self._dir_checksums.update(dir_checksums) - self._file_checksums.update(file_checksums) - self.modified = True - self.lock.release() - - def update_all(self, *checksums): + cmd = "INSERT OR IGNORE INTO {} (checksum, dir) VALUES (?, ?)".format( + self.INDEX_TABLE + ) + self._executemany( + cmd, ((checksum, True) for checksum in dir_checksums) + ) + self._executemany( + cmd, ((checksum, False) for checksum in file_checksums) + ) + + def update_all(self, checksums): """Update this index, adding the specified checksums. - Changes to the index will not be written to disk. + Changes to the index will not committed until dump() is called. """ - dir_checksums, file_checksums = split(self.is_dir_checksum, *checksums) - self.update(dir_checksums, file_checksums) + cmd = "INSERT OR IGNORE INTO {} (checksum, dir) VALUES (?, ?)".format( + self.INDEX_TABLE + ) + self._executemany( + cmd, + ( + (checksum, self.is_dir_checksum(checksum)) + for checksum in checksums + ), + ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 82873db3dd..d5eaa62895 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -13,8 +13,15 @@ from dvc.exceptions import DvcException, DownloadError, UploadError from dvc.path_info import PathInfo from dvc.progress import Tqdm -from dvc.remote.base import RemoteBASE, STATUS_MAP -from dvc.remote.base import STATUS_DELETED, STATUS_MISSING, STATUS_NEW +from dvc.remote.base import ( + index_locked, + RemoteBASE, + STATUS_MAP, + STATUS_DELETED, + STATUS_MISSING, + STATUS_NEW, +) +from dvc.remote.index import RemoteIndexNoop from dvc.scheme import Schemes from dvc.scm.tree import is_working_tree from dvc.system import System @@ -30,6 +37,7 @@ class RemoteLOCAL(RemoteBASE): PARAM_CHECKSUM = "md5" PARAM_PATH = "path" TRAVERSE_PREFIX_LEN = 2 + INDEX_CLS = RemoteIndexNoop UNPACKED_DIR_SUFFIX = ".unpacked" @@ -249,6 +257,7 @@ def _download( def open(path_info, mode="r", encoding=None): return open(fspath_py35(path_info), mode=mode, encoding=encoding) + @index_locked def status( self, named_cache, @@ -267,7 +276,6 @@ def status( download=download, drop_index=drop_index, ) - remote.index.save() return dict(dir_status, **file_status) def _status( @@ -329,7 +337,7 @@ def _status( # Validate our index by verifying all indexed .dir checksums # still exist on the remote missing_dirs = ( - remote.index.checksums.intersection(dir_md5s) + dir_md5s.intersection(remote.index.dir_checksums()) - remote_exists ) if missing_dirs: @@ -489,19 +497,14 @@ def _process( raise DownloadError(fails) raise UploadError(fails) elif not download: - pushed_dir_checksums = list( - map(self.path_to_checksum, dir_plans[0]) - ) - pushed_file_checksums = list( - map(self.path_to_checksum, file_plans[0]) - ) + pushed_dir_checksums = map(self.path_to_checksum, dir_plans[0]) + pushed_file_checksums = map(self.path_to_checksum, file_plans[0]) logger.debug( "Adding {} pushed checksums to index".format( - len(pushed_dir_checksums) + len(pushed_file_checksums) + len(dir_plans[0]) + len(file_plans[0]) ) ) remote.index.update(pushed_dir_checksums, pushed_file_checksums) - remote.index.save() return len(dir_plans[0]) + len(file_plans[0]) @@ -521,6 +524,7 @@ def _dir_upload(func, futures, from_info, to_info, name): return 1 return func(from_info, to_info, name) + @index_locked def push( self, named_cache, @@ -538,6 +542,7 @@ def push( drop_index=drop_index, ) + @index_locked def pull( self, named_cache, diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index ab6814fe13..b6c90fde0b 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -1,88 +1,75 @@ -import pickle -import os.path +import os + +import pytest +from funcy import first from dvc.remote.index import RemoteIndex -def test_init(dvc): +@pytest.fixture(scope="function") +def index(dvc): index = RemoteIndex(dvc, "foo") + index.load() + yield index + index.dump() + os.unlink(index.path) + + +def test_init(dvc, index): assert str(index.path) == os.path.join(dvc.index_dir, "foo.idx") -def test_load(dvc): - checksums = {1, 2, 3} - path = os.path.join(dvc.index_dir, "foo.idx") - with open(path, "wb") as fd: - pickle.dump(checksums, fd) - index = RemoteIndex(dvc, "foo") - assert index.checksums == checksums +def test_is_dir_checksum(dvc, index): + assert index.is_dir_checksum("foo.dir") + assert not index.is_dir_checksum("foo") -def test_save(dvc): - index = RemoteIndex(dvc, "foo") - expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) - expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) - index.replace(expected_dir, expected_file) - index.save() - path = os.path.join(dvc.index_dir, "foo.idx") - with open(path, "rb") as fd: - checksums = pickle.load(fd) - assert index.checksums == checksums +def test_roundtrip(dvc, index): + expected_dir = {"1234.dir"} + expected_file = {"5678"} + index.update(expected_dir, expected_file) + index.dump() + index.load() + assert set(index.checksums()) == expected_dir | expected_file -def test_invalidate(dvc): - index = RemoteIndex(dvc, "foo") - index.replace( - ["0123456789abcdef0123456789abcdef.dir"], - ["fedcba9876543210fedcba9876543210"], +def test_invalidate(dvc, index): + index.update( + ["1234.dir"], ["5678"], ) - index.save() index.invalidate() - assert not index.checksums - assert not os.path.exists(index.path) + assert first(index.checksums()) is None -def test_replace(dvc): - index = RemoteIndex(dvc, "foo") - index._dir_checksums = set() - index._file_checksums = set() - expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) - expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) +def test_replace(dvc, index): + index.update(["1234.dir"], ["5678"]) + expected_dir = {"4321.dir"} + expected_file = {"8765"} index.replace(expected_dir, expected_file) - assert index._dir_checksums == expected_dir - assert index._file_checksums == expected_file + assert set(index.dir_checksums()) == expected_dir + assert set(index.checksums()) == expected_dir | expected_file -def test_replace_all(dvc): - index = RemoteIndex(dvc, "foo") - index._dir_checksums = set() - index._file_checksums = set() - expected_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) - expected_file = frozenset(["fedcba9876543210fedcba9876543210"]) +def test_replace_all(dvc, index): + index.update(["1234.dir"], ["5678"]) + expected_dir = {"4321.dir"} + expected_file = {"8765"} index.replace_all(expected_dir | expected_file) - assert index._dir_checksums == expected_dir - assert index._file_checksums == expected_file + assert set(index.dir_checksums()) == expected_dir + assert set(index.checksums()) == expected_dir | expected_file -def test_update(dvc): - index = RemoteIndex(dvc, "foo") - initial_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) - initial_file = frozenset(["fedcba9876543210fedcba9876543210"]) - index.replace(initial_dir, initial_file) - expected_dir = frozenset(["1123456789abcdef0123456789abcdef.dir"]) - expected_file = frozenset(["fedcba9876543210fedcba9876543211"]) +def test_update(dvc, index): + expected_dir = {"1234.dir"} + expected_file = {"5678"} index.update(expected_dir, expected_file) - assert index._dir_checksums == initial_dir | expected_dir - assert index._file_checksums == initial_file | expected_file + assert set(index.dir_checksums()) == expected_dir + assert set(index.checksums()) == expected_dir | expected_file -def test_update_all(dvc): - index = RemoteIndex(dvc, "foo") - initial_dir = frozenset(["0123456789abcdef0123456789abcdef.dir"]) - initial_file = frozenset(["fedcba9876543210fedcba9876543210"]) - index.replace(initial_dir, initial_file) - expected_dir = frozenset(["1123456789abcdef0123456789abcdef.dir"]) - expected_file = frozenset(["fedcba9876543210fedcba9876543211"]) +def test_update_all(dvc, index): + expected_dir = {"1234.dir"} + expected_file = {"5678"} index.update_all(expected_dir | expected_file) - assert index._dir_checksums == initial_dir | expected_dir - assert index._file_checksums == initial_file | expected_file + assert set(index.dir_checksums()) == expected_dir + assert set(index.checksums()) == expected_dir | expected_file From e90e901a1732fcd87566139ffe95e15770fb05da Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 16 Apr 2020 13:48:57 +0900 Subject: [PATCH 11/32] fix deepsource warnings --- dvc/remote/index.py | 6 ++++-- dvc/remote/local.py | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index ae282d123a..779ee45089 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -20,10 +20,12 @@ def __enter__(self): def __exit__(self, typ, value, tbck): pass - def checksums(self): + @staticmethod + def checksums(): return [] - def dir_checksums(self): + @staticmethod + def dir_checksums(): return [] def load(self): diff --git a/dvc/remote/local.py b/dvc/remote/local.py index d5eaa62895..eb7023df97 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -496,7 +496,8 @@ def _process( remote.index.invalidate() raise DownloadError(fails) raise UploadError(fails) - elif not download: + + if not download: pushed_dir_checksums = map(self.path_to_checksum, dir_plans[0]) pushed_file_checksums = map(self.path_to_checksum, file_plans[0]) logger.debug( From bb1748bcc28f8496490f5c0fea69d40107931e9d Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 16 Apr 2020 14:01:53 +0900 Subject: [PATCH 12/32] rename --drop-index to --clear-index --- dvc/command/data_sync.py | 10 +++++----- dvc/command/status.py | 2 +- dvc/data_cloud.py | 18 +++++++++--------- dvc/remote/local.py | 20 ++++++++++---------- dvc/repo/fetch.py | 4 ++-- dvc/repo/pull.py | 4 ++-- dvc/repo/push.py | 4 ++-- dvc/repo/status.py | 8 ++++---- tests/unit/command/test_data_sync.py | 8 ++++---- tests/unit/command/test_status.py | 4 ++-- 10 files changed, 41 insertions(+), 41 deletions(-) diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index 2efc80d891..f462fd7f6e 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -30,7 +30,7 @@ def run(self): with_deps=self.args.with_deps, force=self.args.force, recursive=self.args.recursive, - drop_index=self.args.drop_index, + clear_index=self.args.clear_index, ) log_summary(stats) except (CheckoutError, DvcException) as exc: @@ -54,7 +54,7 @@ def run(self): all_commits=self.args.all_commits, with_deps=self.args.with_deps, recursive=self.args.recursive, - drop_index=self.args.drop_index, + clear_index=self.args.clear_index, ) except DvcException: logger.exception("failed to push data to the cloud") @@ -165,7 +165,7 @@ def add_parser(subparsers, _parent_parser): help="Pull cache for subdirectories of the specified directory.", ) pull_parser.add_argument( - "--drop-index", + "--clear-index", action="store_true", default=False, help="Drop local index for the specified remote.", @@ -220,7 +220,7 @@ def add_parser(subparsers, _parent_parser): help="Push cache for subdirectories of specified directory.", ) push_parser.add_argument( - "--drop-index", + "--clear-index", action="store_true", default=False, help="Drop local index for the remote.", @@ -349,7 +349,7 @@ def add_parser(subparsers, _parent_parser): help="Show status for all dependencies of the specified target.", ) status_parser.add_argument( - "--drop-index", + "--clear-index", action="store_true", default=False, help="Drop local index for the remote.", diff --git a/dvc/command/status.py b/dvc/command/status.py index 4585fd2ef4..e7bd713e40 100644 --- a/dvc/command/status.py +++ b/dvc/command/status.py @@ -49,7 +49,7 @@ def run(self): all_tags=self.args.all_tags, all_commits=self.args.all_commits, with_deps=self.args.with_deps, - drop_index=self.args.drop_index, + clear_index=self.args.clear_index, ) if st: if self.args.quiet: diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index b14c38e056..4e49939cbd 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -54,7 +54,7 @@ def push( jobs=None, remote=None, show_checksums=False, - drop_index=False, + clear_index=False, ): """Push data items in a cloud-agnostic way. @@ -65,14 +65,14 @@ def push( By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. - drop_index (bool): clear local index for the remote + clear_index (bool): clear local index for the remote """ return self.repo.cache.local.push( cache, jobs=jobs, remote=self.get_remote(remote, "push"), show_checksums=show_checksums, - drop_index=drop_index, + clear_index=clear_index, ) def pull( @@ -81,7 +81,7 @@ def pull( jobs=None, remote=None, show_checksums=False, - drop_index=False, + clear_index=False, ): """Pull data items in a cloud-agnostic way. @@ -92,7 +92,7 @@ def pull( By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. - drop_index (bool): clear local index for the remote + clear_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( @@ -100,7 +100,7 @@ def pull( jobs=jobs, remote=remote, show_checksums=show_checksums, - drop_index=drop_index, + clear_index=clear_index, ) if not remote.verify: @@ -124,7 +124,7 @@ def status( jobs=None, remote=None, show_checksums=False, - drop_index=False, + clear_index=False, ): """Check status of data items in a cloud-agnostic way. @@ -136,7 +136,7 @@ def status( is used. show_checksums (bool): show checksums instead of file names in information messages. - drop_index (bool): clear local index for the remote + clear_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( @@ -144,5 +144,5 @@ def status( jobs=jobs, remote=remote, show_checksums=show_checksums, - drop_index=drop_index, + clear_index=clear_index, ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index eb7023df97..79af8b194c 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -265,7 +265,7 @@ def status( jobs=None, show_checksums=False, download=False, - drop_index=False, + clear_index=False, ): # Return flattened dict containing all status info dir_status, file_status, _ = self._status( @@ -274,7 +274,7 @@ def status( jobs=jobs, show_checksums=show_checksums, download=download, - drop_index=drop_index, + clear_index=clear_index, ) return dict(dir_status, **file_status) @@ -285,7 +285,7 @@ def _status( jobs=None, show_checksums=False, download=False, - drop_index=False, + clear_index=False, ): """Return a tuple of (dir_status_info, file_status_info, dir_mapping). @@ -313,7 +313,7 @@ def _status( else: logger.debug("Collecting information from remote cache...") remote_exists = set() - if drop_index: + if clear_index: logger.debug("Clearing local index for remote cache") remote.index.invalidate() dir_md5s = set(named_cache.dir_keys(self.scheme)) @@ -419,7 +419,7 @@ def _process( jobs=None, show_checksums=False, download=False, - drop_index=False, + clear_index=False, ): logger.debug( "Preparing to {} '{}'".format( @@ -448,7 +448,7 @@ def _process( jobs=jobs, show_checksums=show_checksums, download=download, - drop_index=drop_index, + clear_index=clear_index, ) dir_plans = self._get_plans(download, remote, dir_status, status) @@ -532,7 +532,7 @@ def push( remote, jobs=None, show_checksums=False, - drop_index=False, + clear_index=False, ): return self._process( named_cache, @@ -540,7 +540,7 @@ def push( jobs=jobs, show_checksums=show_checksums, download=False, - drop_index=drop_index, + clear_index=clear_index, ) @index_locked @@ -550,7 +550,7 @@ def pull( remote, jobs=None, show_checksums=False, - drop_index=False, + clear_index=False, ): return self._process( named_cache, @@ -558,7 +558,7 @@ def pull( jobs=jobs, show_checksums=show_checksums, download=True, - drop_index=drop_index, + clear_index=clear_index, ) @staticmethod diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index 467870f8fa..6e7b030203 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -20,7 +20,7 @@ def _fetch( all_tags=False, recursive=False, all_commits=False, - drop_index=False, + clear_index=False, ): """Download data items from a cloud and imported repositories @@ -55,7 +55,7 @@ def _fetch( jobs, remote=remote, show_checksums=show_checksums, - drop_index=drop_index, + clear_index=clear_index, ) except NoRemoteError: if not used.external and used["local"]: diff --git a/dvc/repo/pull.py b/dvc/repo/pull.py index 36d99f45b6..5733919171 100644 --- a/dvc/repo/pull.py +++ b/dvc/repo/pull.py @@ -18,7 +18,7 @@ def pull( force=False, recursive=False, all_commits=False, - drop_index=False, + clear_index=False, ): processed_files_count = self._fetch( targets, @@ -29,7 +29,7 @@ def pull( all_commits=all_commits, with_deps=with_deps, recursive=recursive, - drop_index=drop_index, + clear_index=clear_index, ) stats = self._checkout( targets=targets, with_deps=with_deps, force=force, recursive=recursive diff --git a/dvc/repo/push.py b/dvc/repo/push.py index ed515dd4f5..25fd00f471 100644 --- a/dvc/repo/push.py +++ b/dvc/repo/push.py @@ -12,7 +12,7 @@ def push( all_tags=False, recursive=False, all_commits=False, - drop_index=False, + clear_index=False, ): used = self.used_cache( targets, @@ -25,4 +25,4 @@ def push( jobs=jobs, recursive=recursive, ) - return self.cloud.push(used, jobs, remote=remote, drop_index=drop_index) + return self.cloud.push(used, jobs, remote=remote, clear_index=clear_index) diff --git a/dvc/repo/status.py b/dvc/repo/status.py index 4d48a81ba2..20f6eca25a 100644 --- a/dvc/repo/status.py +++ b/dvc/repo/status.py @@ -45,7 +45,7 @@ def _cloud_status( with_deps=False, all_tags=False, all_commits=False, - drop_index=False, + clear_index=False, ): """Returns a dictionary with the files that are new or deleted. @@ -88,7 +88,7 @@ def _cloud_status( ret = {} status_info = self.cloud.status( - used, jobs, remote=remote, drop_index=drop_index + used, jobs, remote=remote, clear_index=clear_index ) for info in status_info.values(): name = info["name"] @@ -114,7 +114,7 @@ def status( with_deps=False, all_tags=False, all_commits=False, - drop_index=False, + clear_index=False, ): if cloud or remote: return _cloud_status( @@ -126,7 +126,7 @@ def status( remote=remote, all_tags=all_tags, all_commits=all_commits, - drop_index=drop_index, + clear_index=clear_index, ) ignored = list( diff --git a/tests/unit/command/test_data_sync.py b/tests/unit/command/test_data_sync.py index 2bc083b70a..27496bc88a 100644 --- a/tests/unit/command/test_data_sync.py +++ b/tests/unit/command/test_data_sync.py @@ -54,7 +54,7 @@ def test_pull(mocker): "--with-deps", "--force", "--recursive", - "--drop-index", + "--clear-index", ] ) assert cli_args.func == CmdDataPull @@ -74,7 +74,7 @@ def test_pull(mocker): with_deps=True, force=True, recursive=True, - drop_index=True, + clear_index=True, ) @@ -93,7 +93,7 @@ def test_push(mocker): "--all-commits", "--with-deps", "--recursive", - "--drop-index", + "--clear-index", ] ) assert cli_args.func == CmdDataPush @@ -112,5 +112,5 @@ def test_push(mocker): all_commits=True, with_deps=True, recursive=True, - drop_index=True, + clear_index=True, ) diff --git a/tests/unit/command/test_status.py b/tests/unit/command/test_status.py index 2663a6bc7c..88f497e70a 100644 --- a/tests/unit/command/test_status.py +++ b/tests/unit/command/test_status.py @@ -17,7 +17,7 @@ def test_cloud_status(mocker): "--all-tags", "--all-commits", "--with-deps", - "--drop-index", + "--clear-index", ] ) assert cli_args.func == CmdDataStatus @@ -36,5 +36,5 @@ def test_cloud_status(mocker): all_tags=True, all_commits=True, with_deps=True, - drop_index=True, + clear_index=True, ) From 53516f9ab542f2bea5505fb82571d27663e60c00 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 16 Apr 2020 14:02:28 +0900 Subject: [PATCH 13/32] rename index.invalidate to index.clear --- dvc/remote/base.py | 2 +- dvc/remote/index.py | 17 +++++++---------- dvc/remote/local.py | 6 +++--- tests/unit/remote/test_index.py | 4 ++-- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 82a4d8d94d..c068658360 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -780,7 +780,7 @@ def gc(self, named_cache, jobs=None): self.remove(path_info) removed = True if removed: - self.index.invalidate() + self.index.clear() return removed def is_protected(self, path_info): diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 779ee45089..564c068dfb 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -34,7 +34,7 @@ def load(self): def dump(self): pass - def invalidate(self): + def clear(self): pass def replace(self, *args): @@ -171,23 +171,20 @@ def dump(self): self.database = None self.cursor = None - def _clear(self): - cmd = "DELETE FROM {}".format(self.INDEX_TABLE) - self._execute(cmd) - - def invalidate(self): - """Invalidate this index (to force re-indexing later). + def clear(self): + """Clear this index (to force re-indexing later). Changes to the index will not committed until dump() is called. """ - self._clear() + cmd = "DELETE FROM {}".format(self.INDEX_TABLE) + self._execute(cmd) def replace(self, dir_checksums, file_checksums): """Replace the contents of this index with the specified checksums. Changes to the index will not committed until dump() is called. """ - self._clear() + self.clear() self.update(dir_checksums, file_checksums) def replace_all(self, checksums): @@ -195,7 +192,7 @@ def replace_all(self, checksums): Changes to the index will not committed until dump() is called. """ - self._clear() + self.clear() self.update_all(checksums) def update(self, dir_checksums, file_checksums): diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 79af8b194c..eabc5636bc 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -315,7 +315,7 @@ def _status( remote_exists = set() if clear_index: logger.debug("Clearing local index for remote cache") - remote.index.invalidate() + remote.index.clear() dir_md5s = set(named_cache.dir_keys(self.scheme)) if dir_md5s: # If .dir checksum exists on the remote, assume directory @@ -345,7 +345,7 @@ def _status( "Remote cache missing indexed .dir checksums '{}', " "clearing local index".format(", ".join(missing_dirs)) ) - remote.index.invalidate() + remote.index.clear() if md5s: remote_exists.update( remote.cache_exists( @@ -493,7 +493,7 @@ def _process( if fails: if download: - remote.index.invalidate() + remote.index.clear() raise DownloadError(fails) raise UploadError(fails) diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index b6c90fde0b..b956a2084c 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -33,11 +33,11 @@ def test_roundtrip(dvc, index): assert set(index.checksums()) == expected_dir | expected_file -def test_invalidate(dvc, index): +def test_clear(dvc, index): index.update( ["1234.dir"], ["5678"], ) - index.invalidate() + index.clear() assert first(index.checksums()) is None From ebb09403ada0c2739652ad8d7e2e739022b9184f Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 16 Apr 2020 17:05:48 +0900 Subject: [PATCH 14/32] functional tests for remote.index --- tests/unit/remote/test_index.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index b956a2084c..8389e83382 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -30,6 +30,7 @@ def test_roundtrip(dvc, index): index.update(expected_dir, expected_file) index.dump() index.load() + assert set(index.dir_checksums()) == expected_dir assert set(index.checksums()) == expected_dir | expected_file From 6b2fcd23b2c35e772c1797d576bad6f89501658b Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 16 Apr 2020 17:06:36 +0900 Subject: [PATCH 15/32] fix index not being updated on status -c --- dvc/remote/local.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index eabc5636bc..3264223c5f 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -314,7 +314,7 @@ def _status( logger.debug("Collecting information from remote cache...") remote_exists = set() if clear_index: - logger.debug("Clearing local index for remote cache") + logger.debug("--clear-index used, clearing remote index") remote.index.clear() dir_md5s = set(named_cache.dir_keys(self.scheme)) if dir_md5s: @@ -334,6 +334,7 @@ def _status( remote_exists.add(dir_checksum) md5s.difference_update(file_checksums) remote_exists.update(file_checksums) + remote.index.update([dir_checksum], file_checksums) # Validate our index by verifying all indexed .dir checksums # still exist on the remote missing_dirs = ( @@ -343,15 +344,15 @@ def _status( if missing_dirs: logger.debug( "Remote cache missing indexed .dir checksums '{}', " - "clearing local index".format(", ".join(missing_dirs)) + "clearing remote index".format(", ".join(missing_dirs)) ) remote.index.clear() if md5s: - remote_exists.update( - remote.cache_exists( - md5s, jobs=jobs, name=str(remote.path_info) - ) + file_checksums = remote.cache_exists( + md5s, jobs=jobs, name=str(remote.path_info) ) + remote_exists.update(file_checksums) + remote.index.update([], file_checksums) def make_names(checksum, names): return {"name": checksum if show_checksums else " ".join(names)} From 26bedbe8c681d85e0cce6d36b15014f9b62ffc40 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 16 Apr 2020 18:18:07 +0900 Subject: [PATCH 16/32] remote: add index.intersection() --- dvc/remote/index.py | 21 ++++++++++++++++++--- tests/unit/remote/test_index.py | 7 +++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 564c068dfb..3df1a77c65 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -3,6 +3,8 @@ import sqlite3 import threading +from funcy import lchunks + from dvc.state import _connect_sqlite logger = logging.getLogger(__name__) @@ -49,6 +51,10 @@ def update(self, *args): def update_all(self, *args): pass + @staticmethod + def intersection(*args): + return [] + class RemoteIndex: """Class for indexing remote checksums in a sqlite3 database. @@ -102,12 +108,10 @@ def dir_checksums(self): def is_dir_checksum(self, checksum): return checksum.endswith(self.dir_suffix) - def _execute(self, cmd, parameters=()): - logger.debug(cmd) + def _execute(self, cmd, parameters=(), quiet=False): return self.cursor.execute(cmd, parameters) def _executemany(self, cmd, seq_of_parameters): - logger.debug(cmd) return self.cursor.executemany(cmd, seq_of_parameters) def _prepare_db(self, empty=False): @@ -225,3 +229,14 @@ def update_all(self, checksums): for checksum in checksums ), ) + + def intersection(self, checksums): + """Iterate over values from `checksums` which exist in the index.""" + # sqlite has a compile time limit of 999, see: + # https://www.sqlite.org/c3ref/c_limit_attached.html#sqlitelimitvariablenumber + for chunk in lchunks(999, checksums): + cmd = "SELECT checksum FROM {} WHERE checksum IN ({})".format( + self.INDEX_TABLE, ",".join("?" for checksum in chunk) + ) + for (checksum,) in self._execute(cmd, chunk): + yield checksum diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 8389e83382..6069bb22a6 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -74,3 +74,10 @@ def test_update_all(dvc, index): index.update_all(expected_dir | expected_file) assert set(index.dir_checksums()) == expected_dir assert set(index.checksums()) == expected_dir | expected_file + + +def test_intersection(dvc, index): + checksums = (str(i) for i in range(2000)) + expected = set(str(i) for i in range(1000)) + index.update([], checksums) + assert set(index.intersection(expected)) == expected From 4251d88ca8672e6d94f00675a74f676b45fc9a85 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 16 Apr 2020 18:18:46 +0900 Subject: [PATCH 17/32] remote: use index when making assumptions about remote .dir contents --- dvc/remote/base.py | 2 +- dvc/remote/local.py | 66 +++++++++++++++++++++++---------------------- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c068658360..7c644274a8 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -900,7 +900,7 @@ def cache_exists(self, checksums, jobs=None, name=None): assert self.TRAVERSE_PREFIX_LEN >= 2 checksums = set(checksums) - indexed_checksums = checksums.intersection(self.index.checksums()) + indexed_checksums = set(self.index.intersection(checksums)) checksums -= indexed_checksums logger.debug( "Matched {} indexed checksums".format(len(indexed_checksums)) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 3264223c5f..1515775e7c 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -318,42 +318,23 @@ def _status( remote.index.clear() dir_md5s = set(named_cache.dir_keys(self.scheme)) if dir_md5s: - # If .dir checksum exists on the remote, assume directory - # contents also exists on the remote - for dir_checksum in remote._cache_object_exists(dir_md5s): - file_checksums = list( - named_cache.child_keys(self.scheme, dir_checksum) - ) - logger.debug( - "'{}' exists on remote, " - "assuming '{}' files also exist".format( - dir_checksum, len(file_checksums) - ) - ) - md5s.remove(dir_checksum) - remote_exists.add(dir_checksum) - md5s.difference_update(file_checksums) - remote_exists.update(file_checksums) - remote.index.update([dir_checksum], file_checksums) - # Validate our index by verifying all indexed .dir checksums - # still exist on the remote - missing_dirs = ( - dir_md5s.intersection(remote.index.dir_checksums()) - - remote_exists + remote_exists.update( + self._indexed_dir_checksums(named_cache, remote, dir_md5s) ) - if missing_dirs: - logger.debug( - "Remote cache missing indexed .dir checksums '{}', " - "clearing remote index".format(", ".join(missing_dirs)) - ) - remote.index.clear() + md5s.difference_update(remote_exists) if md5s: - file_checksums = remote.cache_exists( - md5s, jobs=jobs, name=str(remote.path_info) + remote_exists.update( + remote.cache_exists( + md5s, jobs=jobs, name=str(remote.path_info) + ) ) - remote_exists.update(file_checksums) - remote.index.update([], file_checksums) + return self._make_status( + named_cache, remote, show_checksums, local_exists, remote_exists + ) + def _make_status( + self, named_cache, remote, show_checksums, local_exists, remote_exists + ): def make_names(checksum, names): return {"name": checksum if show_checksums else " ".join(names)} @@ -382,6 +363,27 @@ def make_names(checksum, names): return dir_status, file_status, dir_paths + def _indexed_dir_checksums(self, named_cache, remote, dir_md5s): + # Validate our index by verifying all indexed .dir checksums + # still exist on the remote + dir_exists = remote._cache_object_exists(dir_md5s) + indexed_dirs = set(remote.index.intersection(dir_md5s)) + missing_dirs = indexed_dirs.difference(dir_exists) + if missing_dirs: + logger.debug( + "Remote cache missing indexed .dir checksums '{}', " + "clearing remote index".format(", ".join(missing_dirs)) + ) + remote.index.clear() + yield from dir_exists + + # If .dir checksum exists on the remote, assume any indexed + # directory contents also exists on the remote + for dir_checksum in dir_exists: + yield from remote.index.intersection( + named_cache.child_keys(self.scheme, dir_checksum) + ) + @staticmethod def _fill_statuses(checksum_info_dir, local_exists, remote_exists): # Using sets because they are way faster for lookups From 2674eafc5c52972a59059035698a52b8b6493adc Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Thu, 16 Apr 2020 19:40:09 +0900 Subject: [PATCH 18/32] add missing func tests --- tests/func/remote/test_index.py | 89 +++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/func/remote/test_index.py diff --git a/tests/func/remote/test_index.py b/tests/func/remote/test_index.py new file mode 100644 index 0000000000..dbedcdeeae --- /dev/null +++ b/tests/func/remote/test_index.py @@ -0,0 +1,89 @@ +import pytest + +from dvc.compat import fspath +from dvc.exceptions import DownloadError +from dvc.remote.base import RemoteBASE +from dvc.remote.local import RemoteLOCAL +from dvc.utils.fs import remove + + +@pytest.fixture(scope="function") +def remote(tmp_dir, dvc, tmp_path_factory, mocker): + url = fspath(tmp_path_factory.mktemp("upstream")) + dvc.config["remote"]["upstream"] = {"url": url} + dvc.config["core"]["remote"] = "upstream" + remote = dvc.cloud.get_remote("upstream") + + # patch cache_exists since the local implementation + # normally overrides RemoteBASE.cache_exists. + def cache_exists(self, *args, **kwargs): + return RemoteBASE.cache_exists(self, *args, **kwargs) + + mocker.patch.object(RemoteLOCAL, "cache_exists", cache_exists) + with remote.index: + return remote + + +def test_indexed_on_status(tmp_dir, dvc, tmp_path_factory, remote, mocker): + foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] + bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] + baz = bar.dir_cache[0] + dvc.push() + + expected = {foo.checksum, bar.checksum, baz["md5"]} + mocked_replace = mocker.patch.object(remote.INDEX_CLS, "replace_all") + dvc.status(cloud=True, clear_index=True) + mocked_replace.assert_called_with(expected) + + +def test_indexed_on_push(tmp_dir, dvc, tmp_path_factory, remote, mocker): + foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] + bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] + baz = bar.dir_cache[0] + + mocked_update = mocker.patch.object(remote.INDEX_CLS, "update") + dvc.push() + call_args = mocked_update.call_args + dir_checksums, file_checksums = call_args[0] + assert [bar.checksum] == list(dir_checksums) + assert [foo.checksum, baz["md5"]] == list(file_checksums) + + +def test_indexed_dir_missing(tmp_dir, dvc, tmp_path_factory, remote, mocker): + bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] + mocker.patch.object( + remote.INDEX_CLS, "intersection", return_value=[bar.checksum] + ) + mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") + dvc.status(cloud=True) + mocked_clear.assert_called_with() + + +def test_clear_index(tmp_dir, dvc, tmp_path_factory, remote, mocker): + mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") + dvc.status(cloud=True, clear_index=True) + mocked_clear.assert_called_with() + + +def test_clear_on_gc(tmp_dir, dvc, tmp_path_factory, remote, mocker): + (foo,) = tmp_dir.dvc_gen({"foo": "foo content"}) + dvc.push() + dvc.remove(foo.relpath) + + # RemoteLOCAL.index.clear will be called twice in this case + # once for local cache and once for the upstream remote + mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") + dvc.gc(workspace=True, cloud=True) + assert len(mocked_clear.mock_calls) == 2 + + +def test_clear_on_download_err(tmp_dir, dvc, tmp_path_factory, remote, mocker): + tmp_dir.dvc_gen({"foo": "foo content"}) + dvc.push() + remove(dvc.cache.local.cache_dir) + + mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") + mocker.patch.object(RemoteLOCAL, "_download", side_effect=Exception) + with pytest.raises(DownloadError): + dvc.pull() + mocked_clear.assert_called_once_with() From aef4973d0b485b76173733b8501d43621077c64f Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 17 Apr 2020 14:26:52 +0900 Subject: [PATCH 19/32] do not index standalone files --- dvc/remote/base.py | 3 --- dvc/remote/local.py | 49 ++++++++++++++++++++++++++++++--------------- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 7c644274a8..40e5024ce0 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -910,8 +910,6 @@ def cache_exists(self, checksums, jobs=None, name=None): if len(checksums) == 1 or not self.CAN_TRAVERSE: remote_checksums = self._cache_object_exists(checksums, jobs, name) - if remote_checksums: - self.index.update_all(remote_checksums) return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method @@ -949,7 +947,6 @@ def cache_exists(self, checksums, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) ) - self.index.replace_all(remote_checksums) return list(indexed_checksums) + list( checksums & set(remote_checksums) ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 1515775e7c..a4e9cbbbfc 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -328,6 +328,18 @@ def _status( md5s, jobs=jobs, name=str(remote.path_info) ) ) + # index new files which are contained inside a directory (do + # not index standalone files) + for dir_checksum in dir_md5s: + new_files = remote_exists.intersection( + named_cache.child_keys(self.scheme, dir_checksum) + ) + logger.debug( + "indexing '{}' new files contained in '{}'".format( + len(new_files), dir_checksum, + ) + ) + remote.index.update([], new_files) return self._make_status( named_cache, remote, show_checksums, local_exists, remote_exists ) @@ -375,14 +387,16 @@ def _indexed_dir_checksums(self, named_cache, remote, dir_md5s): "clearing remote index".format(", ".join(missing_dirs)) ) remote.index.clear() - yield from dir_exists - - # If .dir checksum exists on the remote, assume any indexed - # directory contents also exists on the remote - for dir_checksum in dir_exists: - yield from remote.index.intersection( - named_cache.child_keys(self.scheme, dir_checksum) - ) + else: + # If .dir checksum exists on the remote, assume indexed + # directory contents still exists on the remote + for dir_checksum in dir_exists: + yield from remote.index.intersection( + named_cache.child_keys(self.scheme, dir_checksum) + ) + logger.debug("indexing '{}' new dirs".format(len(dir_exists))) + remote.index.update(dir_exists, []) + yield from dir_exists @staticmethod def _fill_statuses(checksum_info_dir, local_exists, remote_exists): @@ -501,14 +515,17 @@ def _process( raise UploadError(fails) if not download: - pushed_dir_checksums = map(self.path_to_checksum, dir_plans[0]) - pushed_file_checksums = map(self.path_to_checksum, file_plans[0]) - logger.debug( - "Adding {} pushed checksums to index".format( - len(dir_plans[0]) + len(file_plans[0]) - ) - ) - remote.index.update(pushed_dir_checksums, pushed_file_checksums) + # index successfully pushed dirs + for to_info, future in dir_futures.items(): + if future.result() == 0: + dir_checksum = remote.path_to_checksum(to_info) + logger.debug( + "indexing pushed dir '{}'".format(dir_checksum) + ) + remote.index.update( + [dir_checksum], + named_cache.child_keys(self.scheme, dir_checksum), + ) return len(dir_plans[0]) + len(file_plans[0]) From b31034b04f91c7fe270353ae92ccc0748cde332a Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 17 Apr 2020 14:58:26 +0900 Subject: [PATCH 20/32] cleanup index test fixture --- tests/func/remote/test_index.py | 46 +++++++++++++++++---------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/tests/func/remote/test_index.py b/tests/func/remote/test_index.py index dbedcdeeae..81cbb05f19 100644 --- a/tests/func/remote/test_index.py +++ b/tests/func/remote/test_index.py @@ -3,6 +3,7 @@ from dvc.compat import fspath from dvc.exceptions import DownloadError from dvc.remote.base import RemoteBASE +from dvc.remote.index import RemoteIndex from dvc.remote.local import RemoteLOCAL from dvc.utils.fs import remove @@ -12,16 +13,18 @@ def remote(tmp_dir, dvc, tmp_path_factory, mocker): url = fspath(tmp_path_factory.mktemp("upstream")) dvc.config["remote"]["upstream"] = {"url": url} dvc.config["core"]["remote"] = "upstream" - remote = dvc.cloud.get_remote("upstream") - # patch cache_exists since the local implementation - # normally overrides RemoteBASE.cache_exists. + # patch cache_exists since the RemoteLOCAL normally overrides + # RemoteBASE.cache_exists. def cache_exists(self, *args, **kwargs): return RemoteBASE.cache_exists(self, *args, **kwargs) mocker.patch.object(RemoteLOCAL, "cache_exists", cache_exists) - with remote.index: - return remote + + # patch index class since RemoteLOCAL normally overrides index class + mocker.patch.object(RemoteLOCAL, "INDEX_CLS", RemoteIndex) + + return dvc.cloud.get_remote("upstream") def test_indexed_on_status(tmp_dir, dvc, tmp_path_factory, remote, mocker): @@ -29,11 +32,14 @@ def test_indexed_on_status(tmp_dir, dvc, tmp_path_factory, remote, mocker): bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] baz = bar.dir_cache[0] dvc.push() + with remote.index: + remote.index.clear() - expected = {foo.checksum, bar.checksum, baz["md5"]} - mocked_replace = mocker.patch.object(remote.INDEX_CLS, "replace_all") - dvc.status(cloud=True, clear_index=True) - mocked_replace.assert_called_with(expected) + dvc.status(cloud=True) + with remote.index: + assert {bar.checksum, baz["md5"]} == set(remote.index.checksums()) + assert [bar.checksum] == list(remote.index.dir_checksums()) + assert foo.checksum not in remote.index.checksums() def test_indexed_on_push(tmp_dir, dvc, tmp_path_factory, remote, mocker): @@ -41,22 +47,20 @@ def test_indexed_on_push(tmp_dir, dvc, tmp_path_factory, remote, mocker): bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] baz = bar.dir_cache[0] - mocked_update = mocker.patch.object(remote.INDEX_CLS, "update") dvc.push() - call_args = mocked_update.call_args - dir_checksums, file_checksums = call_args[0] - assert [bar.checksum] == list(dir_checksums) - assert [foo.checksum, baz["md5"]] == list(file_checksums) + with remote.index: + assert {bar.checksum, baz["md5"]} == set(remote.index.checksums()) + assert [bar.checksum] == list(remote.index.dir_checksums()) + assert foo.checksum not in remote.index.checksums() def test_indexed_dir_missing(tmp_dir, dvc, tmp_path_factory, remote, mocker): bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] - mocker.patch.object( - remote.INDEX_CLS, "intersection", return_value=[bar.checksum] - ) - mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") + with remote.index: + remote.index.update([bar.checksum], []) dvc.status(cloud=True) - mocked_clear.assert_called_with() + with remote.index: + assert not list(remote.index.checksums()) def test_clear_index(tmp_dir, dvc, tmp_path_factory, remote, mocker): @@ -70,11 +74,9 @@ def test_clear_on_gc(tmp_dir, dvc, tmp_path_factory, remote, mocker): dvc.push() dvc.remove(foo.relpath) - # RemoteLOCAL.index.clear will be called twice in this case - # once for local cache and once for the upstream remote mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") dvc.gc(workspace=True, cloud=True) - assert len(mocked_clear.mock_calls) == 2 + mocked_clear.assert_called_with() def test_clear_on_download_err(tmp_dir, dvc, tmp_path_factory, remote, mocker): From 0d5b3f4ef8893e656144b487260a8b63b37b5579 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 17 Apr 2020 15:08:34 +0900 Subject: [PATCH 21/32] fix deepsource warnings --- dvc/remote/index.py | 2 +- tests/unit/remote/test_index.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 3df1a77c65..dd6e0b9ad0 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -108,7 +108,7 @@ def dir_checksums(self): def is_dir_checksum(self, checksum): return checksum.endswith(self.dir_suffix) - def _execute(self, cmd, parameters=(), quiet=False): + def _execute(self, cmd, parameters=()): return self.cursor.execute(cmd, parameters) def _executemany(self, cmd, seq_of_parameters): diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 6069bb22a6..3ad849f622 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -78,6 +78,6 @@ def test_update_all(dvc, index): def test_intersection(dvc, index): checksums = (str(i) for i in range(2000)) - expected = set(str(i) for i in range(1000)) + expected = {str(i) for i in range(1000)} index.update([], checksums) assert set(index.intersection(expected)) == expected From 7ba6305ea7f558eacf579cd098f87da14187fb62 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 17 Apr 2020 15:42:31 +0900 Subject: [PATCH 22/32] update autocompletion scripts/cli help message for --clear-index --- dvc/command/data_sync.py | 6 +++--- scripts/completion/dvc.bash | 6 +++--- scripts/completion/dvc.zsh | 3 +++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index f462fd7f6e..de6d89f3a9 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -168,7 +168,7 @@ def add_parser(subparsers, _parent_parser): "--clear-index", action="store_true", default=False, - help="Drop local index for the specified remote.", + help="Clear local index for the specified remote.", ) pull_parser.set_defaults(func=CmdDataPull) @@ -223,7 +223,7 @@ def add_parser(subparsers, _parent_parser): "--clear-index", action="store_true", default=False, - help="Drop local index for the remote.", + help="Clear local index for the remote.", ) push_parser.set_defaults(func=CmdDataPush) @@ -352,6 +352,6 @@ def add_parser(subparsers, _parent_parser): "--clear-index", action="store_true", default=False, - help="Drop local index for the remote.", + help="Clear local index for the remote.", ) status_parser.set_defaults(func=CmdDataStatus) diff --git a/scripts/completion/dvc.bash b/scripts/completion/dvc.bash index cde8130e62..995ed238c3 100644 --- a/scripts/completion/dvc.bash +++ b/scripts/completion/dvc.bash @@ -51,9 +51,9 @@ _dvc_pipeline='list show' _dvc_pipeline_list='' _dvc_pipeline_show='-c --commands -o --outs --ascii --dot --tree -l --locked' _dvc_pipeline_show_COMPGEN=_dvc_compgen_DVCFiles -_dvc_pull='-j --jobs -r --remote -a --all-branches -T --all-tags -f --force -d --with-deps -R --recursive' +_dvc_pull='-j --jobs -r --remote -a --all-branches -T --all-tags -f --force -d --with-deps -R --recursive --clear-index' _dvc_pull_COMPGEN=_dvc_compgen_DVCFiles -_dvc_push='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -R --recursive' +_dvc_push='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -R --recursive --clear-index' _dvc_push_COMPGEN=_dvc_compgen_DVCFiles _dvc_remote='add default list modify remove' _dvc_remote_add='--global --system --local -d --default -f --force' @@ -68,7 +68,7 @@ _dvc_repro_COMPGEN=_dvc_compgen_DVCFiles _dvc_root='' _dvc_run='--no-exec -f --file -d --deps -o --outs -O --outs-no-cache --outs-persist --outs-persist-no-cache -m --metrics -M --metrics-no-cache --overwrite-dvcfile --ignore-build-cache --no-commit -w --wdir' _dvc_run_COMPGEN=_dvc_compgen_DVCFiles -_dvc_status='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -c --cloud' +_dvc_status='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -c --cloud --clear-index' _dvc_status_COMPGEN=_dvc_compgen_DVCFiles _dvc_unlock_COMPGEN=_dvc_compgen_DVCFiles _dvc_unprotect_COMPGEN=_dvc_compgen_files diff --git a/scripts/completion/dvc.zsh b/scripts/completion/dvc.zsh index 3ed1a56eaa..b696ad8ca0 100644 --- a/scripts/completion/dvc.zsh +++ b/scripts/completion/dvc.zsh @@ -193,6 +193,7 @@ _dvc_pull=( {-d,--with-deps}"[Fetch cache for all dependencies of the specified target.]" {-f,--force}"[Do not prompt when removing working directory files.]" {-R,--recursive}"[Pull cache for subdirectories of the specified directory.]" + {--clear-index}"[Clear local index for the remote.]" "*:Stages:_files -g '(*.dvc|Dvcfile)'" ) @@ -203,6 +204,7 @@ _dvc_push=( {-T,--all-tags}"[Push cache for all tags.]" {-d,--with-deps}"[Push cache for all dependencies of the specified target.]" {-R,--recursive}"[Push cache for subdirectories of specified directory.]" + {--clear-index}"[Clear local index for the remote.]" "*:Stages:_files -g '(*.dvc|Dvcfile)'" ) @@ -263,6 +265,7 @@ _dvc_status=( {-a,--all-branches}"[Show status of a local cache compared to a remote repository for all branches.]" {-T,--all-tags}"[Show status of a local cache compared to a remote repository for all tags.]" {-d,--with-deps}"[Show status for all dependencies of the specified target.]" + {--clear-index}"[Clear local index for the remote.]" "*:Stages:_files -g '(*.dvc|Dvcfile)'" ) From ed77f9b1a5ebd64d82224095bb23b1d3039bbb7d Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 17 Apr 2020 19:26:49 +0900 Subject: [PATCH 23/32] tests: remove unused mocker --- tests/func/remote/test_index.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/func/remote/test_index.py b/tests/func/remote/test_index.py index 81cbb05f19..bb7f49a841 100644 --- a/tests/func/remote/test_index.py +++ b/tests/func/remote/test_index.py @@ -27,7 +27,7 @@ def cache_exists(self, *args, **kwargs): return dvc.cloud.get_remote("upstream") -def test_indexed_on_status(tmp_dir, dvc, tmp_path_factory, remote, mocker): +def test_indexed_on_status(tmp_dir, dvc, tmp_path_factory, remote): foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] baz = bar.dir_cache[0] @@ -42,7 +42,7 @@ def test_indexed_on_status(tmp_dir, dvc, tmp_path_factory, remote, mocker): assert foo.checksum not in remote.index.checksums() -def test_indexed_on_push(tmp_dir, dvc, tmp_path_factory, remote, mocker): +def test_indexed_on_push(tmp_dir, dvc, tmp_path_factory, remote): foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] baz = bar.dir_cache[0] @@ -54,7 +54,7 @@ def test_indexed_on_push(tmp_dir, dvc, tmp_path_factory, remote, mocker): assert foo.checksum not in remote.index.checksums() -def test_indexed_dir_missing(tmp_dir, dvc, tmp_path_factory, remote, mocker): +def test_indexed_dir_missing(tmp_dir, dvc, tmp_path_factory, remote): bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] with remote.index: remote.index.update([bar.checksum], []) From 6f8745ab1a5e4103a5d77b26d47bf1f4ee2d7d0a Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 20 Apr 2020 12:56:06 +0900 Subject: [PATCH 24/32] push: only index successfully pushed dirs - do not index files on partial push/upload --- dvc/remote/local.py | 13 ++++++++----- tests/func/remote/test_index.py | 20 +++++++++++++++++++- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index a4e9cbbbfc..d208bea36a 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -519,13 +519,16 @@ def _process( for to_info, future in dir_futures.items(): if future.result() == 0: dir_checksum = remote.path_to_checksum(to_info) - logger.debug( - "indexing pushed dir '{}'".format(dir_checksum) + file_checksums = list( + named_cache.child_keys(self.scheme, dir_checksum) ) - remote.index.update( - [dir_checksum], - named_cache.child_keys(self.scheme, dir_checksum), + logger.debug( + "Indexing pushed dir '{}' with " + "'{}' nested files".format( + dir_checksum, len(file_checksums) + ) ) + remote.index.update([dir_checksum], file_checksums) return len(dir_plans[0]) + len(file_plans[0]) diff --git a/tests/func/remote/test_index.py b/tests/func/remote/test_index.py index bb7f49a841..c9a3c589b9 100644 --- a/tests/func/remote/test_index.py +++ b/tests/func/remote/test_index.py @@ -1,7 +1,7 @@ import pytest from dvc.compat import fspath -from dvc.exceptions import DownloadError +from dvc.exceptions import DownloadError, UploadError from dvc.remote.base import RemoteBASE from dvc.remote.index import RemoteIndex from dvc.remote.local import RemoteLOCAL @@ -89,3 +89,21 @@ def test_clear_on_download_err(tmp_dir, dvc, tmp_path_factory, remote, mocker): with pytest.raises(DownloadError): dvc.pull() mocked_clear.assert_called_once_with() + + +def test_partial_upload(tmp_dir, dvc, tmp_path_factory, remote, mocker): + tmp_dir.dvc_gen({"foo": "foo content"}) + tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] + + original = RemoteLOCAL._upload + + def unreliable_upload(self, from_file, to_info, name=None, **kwargs): + if "baz" in name: + raise Exception("stop baz") + return original(self, from_file, to_info, name, **kwargs) + + mocker.patch.object(RemoteLOCAL, "_upload", unreliable_upload) + with pytest.raises(UploadError): + dvc.push() + with remote.index: + assert not list(remote.index.checksums()) From 910283820cd7fea2a2964358ebcdcf6b4a1c24c6 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 20 Apr 2020 12:56:53 +0900 Subject: [PATCH 25/32] remote: revert behavior to trust .dir files on remote - skip unnecessary index check for dir contents if .dir file exists on the remote --- dvc/remote/index.py | 7 +++++++ dvc/remote/local.py | 37 +++++++++++++++---------------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index dd6e0b9ad0..71cdbc3b1a 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -95,6 +95,13 @@ def __exit__(self, typ, value, tbck): self.dump() self.lock.release() + def __contains__(self, checksum): + cmd = "SELECT checksum FROM {} WHERE checksum = (?)".format( + self.INDEX_TABLE + ) + self._execute(cmd, (checksum,)) + return self.cursor.fetchone() is not None + def checksums(self): """Iterate over checksums stored in the index.""" return iter(self) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index d208bea36a..a2c0eaa51f 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -328,18 +328,6 @@ def _status( md5s, jobs=jobs, name=str(remote.path_info) ) ) - # index new files which are contained inside a directory (do - # not index standalone files) - for dir_checksum in dir_md5s: - new_files = remote_exists.intersection( - named_cache.child_keys(self.scheme, dir_checksum) - ) - logger.debug( - "indexing '{}' new files contained in '{}'".format( - len(new_files), dir_checksum, - ) - ) - remote.index.update([], new_files) return self._make_status( named_cache, remote, show_checksums, local_exists, remote_exists ) @@ -379,7 +367,7 @@ def _indexed_dir_checksums(self, named_cache, remote, dir_md5s): # Validate our index by verifying all indexed .dir checksums # still exist on the remote dir_exists = remote._cache_object_exists(dir_md5s) - indexed_dirs = set(remote.index.intersection(dir_md5s)) + indexed_dirs = set(remote.index.dir_checksums()) missing_dirs = indexed_dirs.difference(dir_exists) if missing_dirs: logger.debug( @@ -387,16 +375,21 @@ def _indexed_dir_checksums(self, named_cache, remote, dir_md5s): "clearing remote index".format(", ".join(missing_dirs)) ) remote.index.clear() - else: - # If .dir checksum exists on the remote, assume indexed - # directory contents still exists on the remote - for dir_checksum in dir_exists: - yield from remote.index.intersection( - named_cache.child_keys(self.scheme, dir_checksum) + # If .dir checksum exists on the remote, assume directory contents + # still exists on the remote + for dir_checksum in dir_exists: + file_checksums = list( + named_cache.child_keys(self.scheme, dir_checksum) + ) + if dir_checksum not in remote.index: + logger.debug( + "Indexing new .dir '{}' with '{}' nested files".format( + dir_checksum, len(file_checksums) + ) ) - logger.debug("indexing '{}' new dirs".format(len(dir_exists))) - remote.index.update(dir_exists, []) - yield from dir_exists + remote.index.update([dir_checksum], file_checksums) + yield dir_checksum + yield from file_checksums @staticmethod def _fill_statuses(checksum_info_dir, local_exists, remote_exists): From dd845e01f68b661585fb8325af74c26db4c92fd0 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 20 Apr 2020 13:36:50 +0900 Subject: [PATCH 26/32] fix missing RemoteIndexNoop functions --- dvc/remote/index.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 71cdbc3b1a..40a411192c 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -22,6 +22,12 @@ def __enter__(self): def __exit__(self, typ, value, tbck): pass + def __iter__(self): + return iter([]) + + def __contains__(self, checksum): + return False + @staticmethod def checksums(): return [] From 3e0d25815c7825f79ff76bf91b9a7b6e2f075cb9 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 20 Apr 2020 17:04:47 +0900 Subject: [PATCH 27/32] separate index validation from status dir exists query --- dvc/remote/local.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index a2c0eaa51f..31439ae150 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -366,15 +366,24 @@ def make_names(checksum, names): def _indexed_dir_checksums(self, named_cache, remote, dir_md5s): # Validate our index by verifying all indexed .dir checksums # still exist on the remote - dir_exists = remote._cache_object_exists(dir_md5s) indexed_dirs = set(remote.index.dir_checksums()) - missing_dirs = indexed_dirs.difference(dir_exists) - if missing_dirs: - logger.debug( - "Remote cache missing indexed .dir checksums '{}', " - "clearing remote index".format(", ".join(missing_dirs)) + indexed_dir_exists = set() + if indexed_dirs: + indexed_dir_exists.update( + remote._cache_object_exists(indexed_dirs) ) - remote.index.clear() + missing_dirs = indexed_dirs.difference(indexed_dir_exists) + if missing_dirs: + logger.debug( + "Remote cache missing indexed .dir checksums '{}', " + "clearing remote index".format(", ".join(missing_dirs)) + ) + remote.index.clear() + + # Check if non-indexed (new) dir checksums exist on remote + dir_exists = dir_md5s.intersection(indexed_dir_exists) + dir_exists.update(remote._cache_object_exists(dir_md5s - dir_exists)) + # If .dir checksum exists on the remote, assume directory contents # still exists on the remote for dir_checksum in dir_exists: @@ -511,7 +520,7 @@ def _process( # index successfully pushed dirs for to_info, future in dir_futures.items(): if future.result() == 0: - dir_checksum = remote.path_to_checksum(to_info) + dir_checksum = remote.path_to_checksum(str(to_info)) file_checksums = list( named_cache.child_keys(self.scheme, dir_checksum) ) From 739202b80600bf4ab86d84f035c0ab74edbfef7d Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 20 Apr 2020 17:14:15 +0900 Subject: [PATCH 28/32] bugfix: include indexed checksums for all cache_exists cases --- dvc/remote/base.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 40e5024ce0..26b4b170b4 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -933,10 +933,12 @@ def cache_exists(self, checksums, jobs=None, name=None): len(checksums), traverse_weight ) ) - return list( - checksums & remote_checksums - ) + self._cache_object_exists( - checksums - remote_checksums, jobs, name + return ( + list(indexed_checksums) + + list(checksums & remote_checksums) + + self._cache_object_exists( + checksums - remote_checksums, jobs, name + ) ) logger.debug( From 4a7d5a0ba2b66bcf6b993c602e083254a48cd239 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 20 Apr 2020 22:54:40 +0900 Subject: [PATCH 29/32] review fix: cleanup debug messages --- dvc/remote/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 26b4b170b4..90fc167e16 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -903,7 +903,7 @@ def cache_exists(self, checksums, jobs=None, name=None): indexed_checksums = set(self.index.intersection(checksums)) checksums -= indexed_checksums logger.debug( - "Matched {} indexed checksums".format(len(indexed_checksums)) + "Matched '{}' indexed checksums".format(len(indexed_checksums)) ) if not checksums: return indexed_checksums @@ -942,7 +942,7 @@ def cache_exists(self, checksums, jobs=None, name=None): ) logger.debug( - "Querying {} checksums via traverse".format(len(checksums)) + "Querying '{}' checksums via traverse".format(len(checksums)) ) remote_checksums = set( self._cache_checksums_traverse( From f6a684a18ecc555c6cc780521635946578758c62 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 20 Apr 2020 22:59:31 +0900 Subject: [PATCH 30/32] tests: fix DS warning --- tests/func/remote/test_index.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/func/remote/test_index.py b/tests/func/remote/test_index.py index c9a3c589b9..8c8df3f8c1 100644 --- a/tests/func/remote/test_index.py +++ b/tests/func/remote/test_index.py @@ -93,7 +93,7 @@ def test_clear_on_download_err(tmp_dir, dvc, tmp_path_factory, remote, mocker): def test_partial_upload(tmp_dir, dvc, tmp_path_factory, remote, mocker): tmp_dir.dvc_gen({"foo": "foo content"}) - tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] + tmp_dir.dvc_gen({"bar": {"baz": "baz content"}}) original = RemoteLOCAL._upload From 63c7d9d26a52b900b67916c42bf4dfc8a8d35a5f Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 21 Apr 2020 12:14:01 +0900 Subject: [PATCH 31/32] remove --clear-index --- dvc/command/data_sync.py | 20 ------------- dvc/command/status.py | 1 - dvc/data_cloud.py | 43 ++++------------------------ dvc/remote/local.py | 28 ++---------------- dvc/repo/fetch.py | 7 +---- dvc/repo/pull.py | 2 -- dvc/repo/push.py | 3 +- dvc/repo/status.py | 7 +---- scripts/completion/dvc.bash | 6 ++-- scripts/completion/dvc.zsh | 3 -- tests/func/remote/test_index.py | 6 ---- tests/unit/command/test_data_sync.py | 4 --- tests/unit/command/test_status.py | 2 -- 13 files changed, 13 insertions(+), 119 deletions(-) diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index de6d89f3a9..3091fc915a 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -30,7 +30,6 @@ def run(self): with_deps=self.args.with_deps, force=self.args.force, recursive=self.args.recursive, - clear_index=self.args.clear_index, ) log_summary(stats) except (CheckoutError, DvcException) as exc: @@ -54,7 +53,6 @@ def run(self): all_commits=self.args.all_commits, with_deps=self.args.with_deps, recursive=self.args.recursive, - clear_index=self.args.clear_index, ) except DvcException: logger.exception("failed to push data to the cloud") @@ -164,12 +162,6 @@ def add_parser(subparsers, _parent_parser): default=False, help="Pull cache for subdirectories of the specified directory.", ) - pull_parser.add_argument( - "--clear-index", - action="store_true", - default=False, - help="Clear local index for the specified remote.", - ) pull_parser.set_defaults(func=CmdDataPull) # Push @@ -219,12 +211,6 @@ def add_parser(subparsers, _parent_parser): default=False, help="Push cache for subdirectories of specified directory.", ) - push_parser.add_argument( - "--clear-index", - action="store_true", - default=False, - help="Clear local index for the remote.", - ) push_parser.set_defaults(func=CmdDataPush) # Fetch @@ -348,10 +334,4 @@ def add_parser(subparsers, _parent_parser): default=False, help="Show status for all dependencies of the specified target.", ) - status_parser.add_argument( - "--clear-index", - action="store_true", - default=False, - help="Clear local index for the remote.", - ) status_parser.set_defaults(func=CmdDataStatus) diff --git a/dvc/command/status.py b/dvc/command/status.py index e7bd713e40..a441f71369 100644 --- a/dvc/command/status.py +++ b/dvc/command/status.py @@ -49,7 +49,6 @@ def run(self): all_tags=self.args.all_tags, all_commits=self.args.all_commits, with_deps=self.args.with_deps, - clear_index=self.args.clear_index, ) if st: if self.args.quiet: diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index 4e49939cbd..6fe2683ee2 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -48,14 +48,7 @@ def get_remote(self, remote=None, command=""): def _init_remote(self, remote): return Remote(self.repo, name=remote) - def push( - self, - cache, - jobs=None, - remote=None, - show_checksums=False, - clear_index=False, - ): + def push(self, cache, jobs=None, remote=None, show_checksums=False): """Push data items in a cloud-agnostic way. Args: @@ -65,24 +58,15 @@ def push( By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. - clear_index (bool): clear local index for the remote """ return self.repo.cache.local.push( cache, jobs=jobs, remote=self.get_remote(remote, "push"), show_checksums=show_checksums, - clear_index=clear_index, ) - def pull( - self, - cache, - jobs=None, - remote=None, - show_checksums=False, - clear_index=False, - ): + def pull(self, cache, jobs=None, remote=None, show_checksums=False): """Pull data items in a cloud-agnostic way. Args: @@ -92,15 +76,10 @@ def pull( By default remote from core.remote config option is used. show_checksums (bool): show checksums instead of file names in information messages. - clear_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( - cache, - jobs=jobs, - remote=remote, - show_checksums=show_checksums, - clear_index=clear_index, + cache, jobs=jobs, remote=remote, show_checksums=show_checksums ) if not remote.verify: @@ -118,14 +97,7 @@ def _save_pulled_checksums(self, cache): # (see `RemoteBASE.download()`) self.repo.state.save(cache_file, checksum) - def status( - self, - cache, - jobs=None, - remote=None, - show_checksums=False, - clear_index=False, - ): + def status(self, cache, jobs=None, remote=None, show_checksums=False): """Check status of data items in a cloud-agnostic way. Args: @@ -136,13 +108,8 @@ def status( is used. show_checksums (bool): show checksums instead of file names in information messages. - clear_index (bool): clear local index for the remote """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( - cache, - jobs=jobs, - remote=remote, - show_checksums=show_checksums, - clear_index=clear_index, + cache, jobs=jobs, remote=remote, show_checksums=show_checksums ) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 31439ae150..2e247bf7d6 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -265,7 +265,6 @@ def status( jobs=None, show_checksums=False, download=False, - clear_index=False, ): # Return flattened dict containing all status info dir_status, file_status, _ = self._status( @@ -274,7 +273,6 @@ def status( jobs=jobs, show_checksums=show_checksums, download=download, - clear_index=clear_index, ) return dict(dir_status, **file_status) @@ -285,7 +283,6 @@ def _status( jobs=None, show_checksums=False, download=False, - clear_index=False, ): """Return a tuple of (dir_status_info, file_status_info, dir_mapping). @@ -313,9 +310,6 @@ def _status( else: logger.debug("Collecting information from remote cache...") remote_exists = set() - if clear_index: - logger.debug("--clear-index used, clearing remote index") - remote.index.clear() dir_md5s = set(named_cache.dir_keys(self.scheme)) if dir_md5s: remote_exists.update( @@ -438,7 +432,6 @@ def _process( jobs=None, show_checksums=False, download=False, - clear_index=False, ): logger.debug( "Preparing to {} '{}'".format( @@ -467,7 +460,6 @@ def _process( jobs=jobs, show_checksums=show_checksums, download=download, - clear_index=clear_index, ) dir_plans = self._get_plans(download, remote, dir_status, status) @@ -551,39 +543,23 @@ def _dir_upload(func, futures, from_info, to_info, name): return func(from_info, to_info, name) @index_locked - def push( - self, - named_cache, - remote, - jobs=None, - show_checksums=False, - clear_index=False, - ): + def push(self, named_cache, remote, jobs=None, show_checksums=False): return self._process( named_cache, remote, jobs=jobs, show_checksums=show_checksums, download=False, - clear_index=clear_index, ) @index_locked - def pull( - self, - named_cache, - remote, - jobs=None, - show_checksums=False, - clear_index=False, - ): + def pull(self, named_cache, remote, jobs=None, show_checksums=False): return self._process( named_cache, remote, jobs=jobs, show_checksums=show_checksums, download=True, - clear_index=clear_index, ) @staticmethod diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index 6e7b030203..888f552da0 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -20,7 +20,6 @@ def _fetch( all_tags=False, recursive=False, all_commits=False, - clear_index=False, ): """Download data items from a cloud and imported repositories @@ -51,11 +50,7 @@ def _fetch( try: downloaded += self.cloud.pull( - used, - jobs, - remote=remote, - show_checksums=show_checksums, - clear_index=clear_index, + used, jobs, remote=remote, show_checksums=show_checksums ) except NoRemoteError: if not used.external and used["local"]: diff --git a/dvc/repo/pull.py b/dvc/repo/pull.py index 5733919171..4623f6a4ec 100644 --- a/dvc/repo/pull.py +++ b/dvc/repo/pull.py @@ -18,7 +18,6 @@ def pull( force=False, recursive=False, all_commits=False, - clear_index=False, ): processed_files_count = self._fetch( targets, @@ -29,7 +28,6 @@ def pull( all_commits=all_commits, with_deps=with_deps, recursive=recursive, - clear_index=clear_index, ) stats = self._checkout( targets=targets, with_deps=with_deps, force=force, recursive=recursive diff --git a/dvc/repo/push.py b/dvc/repo/push.py index 25fd00f471..43946d37c0 100644 --- a/dvc/repo/push.py +++ b/dvc/repo/push.py @@ -12,7 +12,6 @@ def push( all_tags=False, recursive=False, all_commits=False, - clear_index=False, ): used = self.used_cache( targets, @@ -25,4 +24,4 @@ def push( jobs=jobs, recursive=recursive, ) - return self.cloud.push(used, jobs, remote=remote, clear_index=clear_index) + return self.cloud.push(used, jobs, remote=remote) diff --git a/dvc/repo/status.py b/dvc/repo/status.py index 20f6eca25a..40780c66ec 100644 --- a/dvc/repo/status.py +++ b/dvc/repo/status.py @@ -45,7 +45,6 @@ def _cloud_status( with_deps=False, all_tags=False, all_commits=False, - clear_index=False, ): """Returns a dictionary with the files that are new or deleted. @@ -87,9 +86,7 @@ def _cloud_status( ) ret = {} - status_info = self.cloud.status( - used, jobs, remote=remote, clear_index=clear_index - ) + status_info = self.cloud.status(used, jobs, remote=remote) for info in status_info.values(): name = info["name"] status = info["status"] @@ -114,7 +111,6 @@ def status( with_deps=False, all_tags=False, all_commits=False, - clear_index=False, ): if cloud or remote: return _cloud_status( @@ -126,7 +122,6 @@ def status( remote=remote, all_tags=all_tags, all_commits=all_commits, - clear_index=clear_index, ) ignored = list( diff --git a/scripts/completion/dvc.bash b/scripts/completion/dvc.bash index 995ed238c3..cde8130e62 100644 --- a/scripts/completion/dvc.bash +++ b/scripts/completion/dvc.bash @@ -51,9 +51,9 @@ _dvc_pipeline='list show' _dvc_pipeline_list='' _dvc_pipeline_show='-c --commands -o --outs --ascii --dot --tree -l --locked' _dvc_pipeline_show_COMPGEN=_dvc_compgen_DVCFiles -_dvc_pull='-j --jobs -r --remote -a --all-branches -T --all-tags -f --force -d --with-deps -R --recursive --clear-index' +_dvc_pull='-j --jobs -r --remote -a --all-branches -T --all-tags -f --force -d --with-deps -R --recursive' _dvc_pull_COMPGEN=_dvc_compgen_DVCFiles -_dvc_push='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -R --recursive --clear-index' +_dvc_push='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -R --recursive' _dvc_push_COMPGEN=_dvc_compgen_DVCFiles _dvc_remote='add default list modify remove' _dvc_remote_add='--global --system --local -d --default -f --force' @@ -68,7 +68,7 @@ _dvc_repro_COMPGEN=_dvc_compgen_DVCFiles _dvc_root='' _dvc_run='--no-exec -f --file -d --deps -o --outs -O --outs-no-cache --outs-persist --outs-persist-no-cache -m --metrics -M --metrics-no-cache --overwrite-dvcfile --ignore-build-cache --no-commit -w --wdir' _dvc_run_COMPGEN=_dvc_compgen_DVCFiles -_dvc_status='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -c --cloud --clear-index' +_dvc_status='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -c --cloud' _dvc_status_COMPGEN=_dvc_compgen_DVCFiles _dvc_unlock_COMPGEN=_dvc_compgen_DVCFiles _dvc_unprotect_COMPGEN=_dvc_compgen_files diff --git a/scripts/completion/dvc.zsh b/scripts/completion/dvc.zsh index b696ad8ca0..3ed1a56eaa 100644 --- a/scripts/completion/dvc.zsh +++ b/scripts/completion/dvc.zsh @@ -193,7 +193,6 @@ _dvc_pull=( {-d,--with-deps}"[Fetch cache for all dependencies of the specified target.]" {-f,--force}"[Do not prompt when removing working directory files.]" {-R,--recursive}"[Pull cache for subdirectories of the specified directory.]" - {--clear-index}"[Clear local index for the remote.]" "*:Stages:_files -g '(*.dvc|Dvcfile)'" ) @@ -204,7 +203,6 @@ _dvc_push=( {-T,--all-tags}"[Push cache for all tags.]" {-d,--with-deps}"[Push cache for all dependencies of the specified target.]" {-R,--recursive}"[Push cache for subdirectories of specified directory.]" - {--clear-index}"[Clear local index for the remote.]" "*:Stages:_files -g '(*.dvc|Dvcfile)'" ) @@ -265,7 +263,6 @@ _dvc_status=( {-a,--all-branches}"[Show status of a local cache compared to a remote repository for all branches.]" {-T,--all-tags}"[Show status of a local cache compared to a remote repository for all tags.]" {-d,--with-deps}"[Show status for all dependencies of the specified target.]" - {--clear-index}"[Clear local index for the remote.]" "*:Stages:_files -g '(*.dvc|Dvcfile)'" ) diff --git a/tests/func/remote/test_index.py b/tests/func/remote/test_index.py index 8c8df3f8c1..3158e61f2f 100644 --- a/tests/func/remote/test_index.py +++ b/tests/func/remote/test_index.py @@ -63,12 +63,6 @@ def test_indexed_dir_missing(tmp_dir, dvc, tmp_path_factory, remote): assert not list(remote.index.checksums()) -def test_clear_index(tmp_dir, dvc, tmp_path_factory, remote, mocker): - mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") - dvc.status(cloud=True, clear_index=True) - mocked_clear.assert_called_with() - - def test_clear_on_gc(tmp_dir, dvc, tmp_path_factory, remote, mocker): (foo,) = tmp_dir.dvc_gen({"foo": "foo content"}) dvc.push() diff --git a/tests/unit/command/test_data_sync.py b/tests/unit/command/test_data_sync.py index 27496bc88a..41d464dbd9 100644 --- a/tests/unit/command/test_data_sync.py +++ b/tests/unit/command/test_data_sync.py @@ -54,7 +54,6 @@ def test_pull(mocker): "--with-deps", "--force", "--recursive", - "--clear-index", ] ) assert cli_args.func == CmdDataPull @@ -74,7 +73,6 @@ def test_pull(mocker): with_deps=True, force=True, recursive=True, - clear_index=True, ) @@ -93,7 +91,6 @@ def test_push(mocker): "--all-commits", "--with-deps", "--recursive", - "--clear-index", ] ) assert cli_args.func == CmdDataPush @@ -112,5 +109,4 @@ def test_push(mocker): all_commits=True, with_deps=True, recursive=True, - clear_index=True, ) diff --git a/tests/unit/command/test_status.py b/tests/unit/command/test_status.py index 88f497e70a..7b8ea33248 100644 --- a/tests/unit/command/test_status.py +++ b/tests/unit/command/test_status.py @@ -17,7 +17,6 @@ def test_cloud_status(mocker): "--all-tags", "--all-commits", "--with-deps", - "--clear-index", ] ) assert cli_args.func == CmdDataStatus @@ -36,5 +35,4 @@ def test_cloud_status(mocker): all_tags=True, all_commits=True, with_deps=True, - clear_index=True, ) From 3ec83f60227c488fb325952c1c5c02664f3d0a7c Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 21 Apr 2020 19:33:29 +0900 Subject: [PATCH 32/32] remove unused index functions --- dvc/remote/index.py | 41 --------------------------------- tests/unit/remote/test_index.py | 26 --------------------- 2 files changed, 67 deletions(-) diff --git a/dvc/remote/index.py b/dvc/remote/index.py index 40a411192c..e56a9f99bd 100644 --- a/dvc/remote/index.py +++ b/dvc/remote/index.py @@ -45,18 +45,9 @@ def dump(self): def clear(self): pass - def replace(self, *args): - pass - - def replace_all(self, *args): - pass - def update(self, *args): pass - def update_all(self, *args): - pass - @staticmethod def intersection(*args): return [] @@ -196,22 +187,6 @@ def clear(self): cmd = "DELETE FROM {}".format(self.INDEX_TABLE) self._execute(cmd) - def replace(self, dir_checksums, file_checksums): - """Replace the contents of this index with the specified checksums. - - Changes to the index will not committed until dump() is called. - """ - self.clear() - self.update(dir_checksums, file_checksums) - - def replace_all(self, checksums): - """Replace the contents of this index with the specified checksums. - - Changes to the index will not committed until dump() is called. - """ - self.clear() - self.update_all(checksums) - def update(self, dir_checksums, file_checksums): """Update this index, adding the specified checksums. @@ -227,22 +202,6 @@ def update(self, dir_checksums, file_checksums): cmd, ((checksum, False) for checksum in file_checksums) ) - def update_all(self, checksums): - """Update this index, adding the specified checksums. - - Changes to the index will not committed until dump() is called. - """ - cmd = "INSERT OR IGNORE INTO {} (checksum, dir) VALUES (?, ?)".format( - self.INDEX_TABLE - ) - self._executemany( - cmd, - ( - (checksum, self.is_dir_checksum(checksum)) - for checksum in checksums - ), - ) - def intersection(self, checksums): """Iterate over values from `checksums` which exist in the index.""" # sqlite has a compile time limit of 999, see: diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py index 3ad849f622..2331f0b252 100644 --- a/tests/unit/remote/test_index.py +++ b/tests/unit/remote/test_index.py @@ -42,24 +42,6 @@ def test_clear(dvc, index): assert first(index.checksums()) is None -def test_replace(dvc, index): - index.update(["1234.dir"], ["5678"]) - expected_dir = {"4321.dir"} - expected_file = {"8765"} - index.replace(expected_dir, expected_file) - assert set(index.dir_checksums()) == expected_dir - assert set(index.checksums()) == expected_dir | expected_file - - -def test_replace_all(dvc, index): - index.update(["1234.dir"], ["5678"]) - expected_dir = {"4321.dir"} - expected_file = {"8765"} - index.replace_all(expected_dir | expected_file) - assert set(index.dir_checksums()) == expected_dir - assert set(index.checksums()) == expected_dir | expected_file - - def test_update(dvc, index): expected_dir = {"1234.dir"} expected_file = {"5678"} @@ -68,14 +50,6 @@ def test_update(dvc, index): assert set(index.checksums()) == expected_dir | expected_file -def test_update_all(dvc, index): - expected_dir = {"1234.dir"} - expected_file = {"5678"} - index.update_all(expected_dir | expected_file) - assert set(index.dir_checksums()) == expected_dir - assert set(index.checksums()) == expected_dir | expected_file - - def test_intersection(dvc, index): checksums = (str(i) for i in range(2000)) expected = {str(i) for i in range(1000)}