diff --git a/dvc/cache/base.py b/dvc/cache/base.py index 8a6b089549..54f60c5689 100644 --- a/dvc/cache/base.py +++ b/dvc/cache/base.py @@ -115,7 +115,7 @@ def _filter_hash_info(self, hash_info, path_info, filter_info): return self._get_dir_info_hash(filtered_dir_info)[0] - def changed(self, path_info, hash_info, filter_info=None): + def changed(self, path_info, tree, hash_info, filter_info=None): """Checks if data has changed. A file is considered changed if: @@ -138,7 +138,7 @@ def changed(self, path_info, hash_info, filter_info=None): "checking if '%s'('%s') has changed.", path_info, hash_info ) - if not self.tree.exists(path): + if not tree.exists(path): logger.debug("'%s' doesn't exist.", path) return True @@ -151,7 +151,7 @@ def changed(self, path_info, hash_info, filter_info=None): logger.debug("cache for '%s'('%s') has changed.", path, hi) return True - actual = self.tree.get_hash(path) + actual = tree.get_hash(path) if hi != actual: logger.debug( "hash value '%s' for '%s' has changed (actual '%s').", @@ -218,40 +218,25 @@ def _do_link(self, from_info, to_info, link_method): "Created '%s': %s -> %s", self.cache_types[0], from_info, to_info, ) - def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs): + def _save_file(self, path_info, tree, hash_info, **kwargs): assert hash_info cache_info = self.tree.hash_to_path_info(hash_info.value) - if tree == self.tree: - if self.changed_cache(hash_info): - self.makedirs(cache_info.parent) - self.move(path_info, cache_info) - self.protect(cache_info) - self.link(cache_info, path_info) - elif self.tree.iscopy(path_info) and self._cache_is_copy( - path_info - ): - # Default relink procedure involves unneeded copy - self.unprotect(path_info) + if self.changed_cache(hash_info): + # using our makedirs to create dirs with proper permissions + self.makedirs(cache_info.parent) + if isinstance(tree, type(self.tree)): + self.tree.move(path_info, cache_info) else: - self.tree.remove(path_info) - self.link(cache_info, path_info) - - if save_link: - self.tree.state.save_link(path_info) - # we need to update path and cache, since in case of reflink, - # or copy cache type moving original file results in updates on - # next executed command, which causes md5 recalculation - self.tree.state.save(path_info, hash_info) - else: - if self.changed_cache(hash_info): with tree.open(path_info, mode="rb") as fobj: self.tree.upload_fobj(fobj, cache_info) - callback = kwargs.get("download_callback") - if callback: - callback(1) + tree.state.save(path_info, hash_info) + self.protect(cache_info) + self.tree.state.save(cache_info, hash_info) - self.tree.state.save(cache_info, hash_info) + callback = kwargs.get("download_callback") + if callback: + callback(1) def _transfer_file(self, from_tree, from_info): from dvc.utils import tmp_fname @@ -399,13 +384,7 @@ def save_dir_info(self, dir_info, hash_info=None): return hi def _save_dir( - self, - path_info, - tree, - hash_info, - save_link=True, - filter_info=None, - **kwargs, + self, path_info, tree, hash_info, filter_info=None, **kwargs, ): if not hash_info.dir_info: hash_info.dir_info = tree.cache.get_dir_cache(hash_info) @@ -418,20 +397,14 @@ def _save_dir( if filter_info and not entry_info.isin_or_eq(filter_info): continue - self._save_file( - entry_info, tree, entry_hash, save_link=False, **kwargs - ) - - if save_link: - self.tree.state.save_link(path_info) - if self.tree.exists(path_info): - self.tree.state.save(path_info, hi) + self._save_file(entry_info, tree, entry_hash, **kwargs) cache_info = self.tree.hash_to_path_info(hi.value) self.tree.state.save(cache_info, hi) + tree.state.save(path_info, hi) @use_state - def save(self, path_info, tree, hash_info, save_link=True, **kwargs): + def save(self, path_info, tree, hash_info, **kwargs): if path_info.scheme != self.tree.scheme: raise RemoteActionNotImplemented( f"save {path_info.scheme} -> {self.tree.scheme}", @@ -447,17 +420,17 @@ def save(self, path_info, tree, hash_info, save_link=True, **kwargs): errno.ENOENT, os.strerror(errno.ENOENT), path_info ) - self._save(path_info, tree, hash_info, save_link, **kwargs) + self._save(path_info, tree, hash_info, **kwargs) return hash_info - def _save(self, path_info, tree, hash_info, save_link=True, **kwargs): + def _save(self, path_info, tree, hash_info, **kwargs): to_info = self.tree.hash_to_path_info(hash_info.value) logger.debug("Saving '%s' to '%s'.", path_info, to_info) if tree.isdir(path_info): - self._save_dir(path_info, tree, hash_info, save_link, **kwargs) + self._save_dir(path_info, tree, hash_info, **kwargs) else: - self._save_file(path_info, tree, hash_info, save_link, **kwargs) + self._save_file(path_info, tree, hash_info, **kwargs) # Override to return path as a string instead of PathInfo for clouds # which support string paths (see local) @@ -542,19 +515,19 @@ def changed_cache(self, hash_info, path_info=None, filter_info=None): ) return self.changed_cache_file(hash_info) - def already_cached(self, path_info): - current = self.tree.get_hash(path_info) + def already_cached(self, path_info, tree): + current = tree.get_hash(path_info) if not current: return False return not self.changed_cache(current) - def safe_remove(self, path_info, force=False): - if not self.tree.exists(path_info): + def safe_remove(self, path_info, tree, force=False): + if not tree.exists(path_info): return - if not force and not self.already_cached(path_info): + if not force and not self.already_cached(path_info, tree): msg = ( "file '{}' is going to be removed." " Are you sure you want to proceed?".format(str(path_info)) @@ -563,30 +536,48 @@ def safe_remove(self, path_info, force=False): if not prompt.confirm(msg): raise ConfirmRemoveError(str(path_info)) - self.tree.remove(path_info) + tree.remove(path_info) def _checkout_file( - self, path_info, hash_info, force, progress_callback=None, relink=False + self, + path_info, + tree, + hash_info, + force, + progress_callback=None, + relink=False, ): """The file is changed we need to checkout a new copy""" - added, modified = True, False cache_info = self.tree.hash_to_path_info(hash_info.value) - if self.tree.exists(path_info): - logger.debug("data '%s' will be replaced.", path_info) - self.safe_remove(path_info, force=force) - added, modified = False, True - - self.link(cache_info, path_info) - self.tree.state.save_link(path_info) - self.tree.state.save(path_info, hash_info) + if tree.exists(path_info): + added = False + + if not relink and self.changed(path_info, tree, hash_info): + modified = True + self.safe_remove(path_info, tree, force=force) + self.link(cache_info, path_info) + else: + modified = False + + if tree.iscopy(path_info) and self._cache_is_copy(path_info): + self.unprotect(path_info) + else: + self.safe_remove(path_info, tree, force=force) + self.link(cache_info, path_info) + else: + self.link(cache_info, path_info) + added, modified = True, False + + tree.state.save(path_info, hash_info) if progress_callback: progress_callback(str(path_info)) - return added, modified and not relink + return added, modified def _checkout_dir( self, path_info, + tree, hash_info, force, progress_callback=None, @@ -596,7 +587,7 @@ def _checkout_dir( added, modified = False, False # Create dir separately so that dir is created # even if there are no files in it - if not self.tree.exists(path_info): + if not tree.exists(path_info): added = True self.makedirs(path_info) @@ -605,39 +596,37 @@ def _checkout_dir( logger.debug("Linking directory '%s'.", path_info) for entry_info, entry_hash_info in dir_info.items(path_info): - entry_cache_info = self.tree.hash_to_path_info( - entry_hash_info.value - ) - if filter_info and not entry_info.isin_or_eq(filter_info): continue - if relink or self.changed(entry_info, entry_hash_info): + entry_added, entry_modified = self._checkout_file( + entry_info, + tree, + entry_hash_info, + force, + progress_callback, + relink, + ) + if entry_added or entry_modified: modified = True - self.safe_remove(entry_info, force=force) - self.link(entry_cache_info, entry_info) - self.tree.state.save(entry_info, entry_hash_info) - if progress_callback: - progress_callback(str(entry_info)) modified = ( - self._remove_redundant_files(path_info, dir_info, force) + self._remove_redundant_files(path_info, tree, dir_info, force) or modified ) - self.tree.state.save_link(path_info) - self.tree.state.save(path_info, hash_info) + tree.state.save(path_info, hash_info) # relink is not modified, assume it as nochange return added, not added and modified and not relink - def _remove_redundant_files(self, path_info, dir_info, force): - existing_files = set(self.tree.walk_files(path_info)) + def _remove_redundant_files(self, path_info, tree, dir_info, force): + existing_files = set(tree.walk_files(path_info)) needed_files = {info for info, _ in dir_info.items(path_info)} redundant_files = existing_files - needed_files for path in redundant_files: - self.safe_remove(path, force) + self.safe_remove(path, tree, force) return bool(redundant_files) @@ -645,6 +634,7 @@ def _remove_redundant_files(self, path_info, dir_info, force): def checkout( self, path_info, + tree, hash_info, force=False, progress_callback=None, @@ -663,11 +653,11 @@ def checkout( "No file hash info found for '%s'. It won't be created.", path_info, ) - self.safe_remove(path_info, force=force) + self.safe_remove(path_info, tree, force=force) failed = path_info elif not relink and not self.changed( - path_info, hash_info, filter_info=filter_info + path_info, tree, hash_info, filter_info=filter_info ): logger.trace("Data '%s' didn't change.", path_info) skip = True @@ -681,7 +671,7 @@ def checkout( hash_info, path_info, ) - self.safe_remove(path_info, force=force) + self.safe_remove(path_info, tree, force=force) failed = path_info if failed or skip: @@ -702,6 +692,7 @@ def checkout( return self._checkout( path_info, + tree, hash_info, force, progress_callback, @@ -712,6 +703,7 @@ def checkout( def _checkout( self, path_info, + tree, hash_info, force=False, progress_callback=None, @@ -719,13 +711,23 @@ def _checkout( filter_info=None, ): if not hash_info.isdir: - return self._checkout_file( - path_info, hash_info, force, progress_callback, relink + ret = self._checkout_file( + path_info, tree, hash_info, force, progress_callback, relink + ) + else: + ret = self._checkout_dir( + path_info, + tree, + hash_info, + force, + progress_callback, + relink, + filter_info, ) - return self._checkout_dir( - path_info, hash_info, force, progress_callback, relink, filter_info - ) + tree.state.save_link(path_info) + + return ret def get_files_number(self, path_info, hash_info, filter_info): from funcy.py3 import ilen diff --git a/dvc/cache/local.py b/dvc/cache/local.py index c35f67208c..911c39bb91 100644 --- a/dvc/cache/local.py +++ b/dvc/cache/local.py @@ -81,10 +81,10 @@ def hashes_exist( ) ] - def already_cached(self, path_info): + def already_cached(self, path_info, tree): assert path_info.scheme in ["", "local"] - return super().already_cached(path_info) + return super().already_cached(path_info, tree) def _verify_link(self, path_info, link_type): if link_type == "hardlink" and self.tree.getsize(path_info) == 0: diff --git a/dvc/dependency/repo.py b/dvc/dependency/repo.py index 7cdd6310ff..65b8df286f 100644 --- a/dvc/dependency/repo.py +++ b/dvc/dependency/repo.py @@ -84,7 +84,7 @@ def download(self, to, jobs=None): follow_subrepos=False, ) - cache.checkout(to.path_info, hash_info) + cache.checkout(to.path_info, to.tree, hash_info) def update(self, rev=None): if rev: diff --git a/dvc/output/base.py b/dvc/output/base.py index 49f2a6a757..881889acb1 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -311,10 +311,17 @@ def commit(self, filter_info=None): if self.use_cache: self.cache.save( self.path_info, - self.cache.tree, + self.tree, self.hash_info, filter_info=filter_info, ) + self.cache.checkout( + self.path_info, + self.tree, + self.hash_info, + relink=True, + filter_info=filter_info, + ) self.set_exec() def dumpd(self): @@ -382,6 +389,7 @@ def checkout( try: res = self.cache.checkout( self.path_info, + self.tree, self.hash_info, force=force, progress_callback=progress_callback, diff --git a/dvc/tree/ssh/__init__.py b/dvc/tree/ssh/__init__.py index 8a2dee669b..24a49b3c7b 100644 --- a/dvc/tree/ssh/__init__.py +++ b/dvc/tree/ssh/__init__.py @@ -252,6 +252,7 @@ def getsize(self, path_info): return ssh.getsize(path_info.path) def _upload_fobj(self, fobj, to_info): + self.makedirs(to_info.parent) with self.open(to_info, mode="wb") as fdest: shutil.copyfileobj(fobj, fdest) diff --git a/tests/func/test_run_cache.py b/tests/func/test_run_cache.py index 16acab93a8..4bb7d503b8 100644 --- a/tests/func/test_run_cache.py +++ b/tests/func/test_run_cache.py @@ -168,6 +168,6 @@ def test_restore_pull(tmp_dir, dvc, run_copy, mocker, local_remote): mock_restore.assert_called_once_with(stage, pull=True) mock_run.assert_not_called() - mock_checkout.assert_called_once() + assert mock_checkout.call_count == 2 assert (tmp_dir / "bar").exists() and not (tmp_dir / "foo").unlink() assert (tmp_dir / PIPELINE_LOCK).exists() diff --git a/tests/unit/stage/test_cache.py b/tests/unit/stage/test_cache.py index eb1ce9514b..b6f56c75d6 100644 --- a/tests/unit/stage/test_cache.py +++ b/tests/unit/stage/test_cache.py @@ -47,7 +47,7 @@ def test_stage_cache(tmp_dir, dvc, mocker): stage.run() assert not run_spy.called - assert checkout_spy.call_count == 2 + assert checkout_spy.call_count == 4 assert (tmp_dir / "out").exists() assert (tmp_dir / "out_no_cache").exists() @@ -100,7 +100,7 @@ def test_stage_cache_params(tmp_dir, dvc, mocker): stage.run() assert not run_spy.called - assert checkout_spy.call_count == 2 + assert checkout_spy.call_count == 4 assert (tmp_dir / "out").exists() assert (tmp_dir / "out_no_cache").exists() @@ -154,7 +154,7 @@ def test_stage_cache_wdir(tmp_dir, dvc, mocker): stage.run() assert not run_spy.called - assert checkout_spy.call_count == 2 + assert checkout_spy.call_count == 4 assert (tmp_dir / "wdir" / "out").exists() assert (tmp_dir / "wdir" / "out_no_cache").exists()