Skip to content

Commit

Permalink
cache: separate save() and checkout() (iterative#5412)
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop authored Feb 4, 2021
1 parent e7cc776 commit 8676617
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 100 deletions.
186 changes: 94 additions & 92 deletions dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def _filter_hash_info(self, hash_info, path_info, filter_info):

return self._get_dir_info_hash(filtered_dir_info)[0]

def changed(self, path_info, hash_info, filter_info=None):
def changed(self, path_info, tree, hash_info, filter_info=None):
"""Checks if data has changed.
A file is considered changed if:
Expand All @@ -138,7 +138,7 @@ def changed(self, path_info, hash_info, filter_info=None):
"checking if '%s'('%s') has changed.", path_info, hash_info
)

if not self.tree.exists(path):
if not tree.exists(path):
logger.debug("'%s' doesn't exist.", path)
return True

Expand All @@ -151,7 +151,7 @@ def changed(self, path_info, hash_info, filter_info=None):
logger.debug("cache for '%s'('%s') has changed.", path, hi)
return True

actual = self.tree.get_hash(path)
actual = tree.get_hash(path)
if hi != actual:
logger.debug(
"hash value '%s' for '%s' has changed (actual '%s').",
Expand Down Expand Up @@ -218,40 +218,25 @@ def _do_link(self, from_info, to_info, link_method):
"Created '%s': %s -> %s", self.cache_types[0], from_info, to_info,
)

def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):
def _save_file(self, path_info, tree, hash_info, **kwargs):
assert hash_info

cache_info = self.tree.hash_to_path_info(hash_info.value)
if tree == self.tree:
if self.changed_cache(hash_info):
self.makedirs(cache_info.parent)
self.move(path_info, cache_info)
self.protect(cache_info)
self.link(cache_info, path_info)
elif self.tree.iscopy(path_info) and self._cache_is_copy(
path_info
):
# Default relink procedure involves unneeded copy
self.unprotect(path_info)
if self.changed_cache(hash_info):
# using our makedirs to create dirs with proper permissions
self.makedirs(cache_info.parent)
if isinstance(tree, type(self.tree)):
self.tree.move(path_info, cache_info)
else:
self.tree.remove(path_info)
self.link(cache_info, path_info)

if save_link:
self.tree.state.save_link(path_info)
# we need to update path and cache, since in case of reflink,
# or copy cache type moving original file results in updates on
# next executed command, which causes md5 recalculation
self.tree.state.save(path_info, hash_info)
else:
if self.changed_cache(hash_info):
with tree.open(path_info, mode="rb") as fobj:
self.tree.upload_fobj(fobj, cache_info)
callback = kwargs.get("download_callback")
if callback:
callback(1)
tree.state.save(path_info, hash_info)
self.protect(cache_info)
self.tree.state.save(cache_info, hash_info)

self.tree.state.save(cache_info, hash_info)
callback = kwargs.get("download_callback")
if callback:
callback(1)

def _transfer_file(self, from_tree, from_info):
from dvc.utils import tmp_fname
Expand Down Expand Up @@ -399,13 +384,7 @@ def save_dir_info(self, dir_info, hash_info=None):
return hi

def _save_dir(
self,
path_info,
tree,
hash_info,
save_link=True,
filter_info=None,
**kwargs,
self, path_info, tree, hash_info, filter_info=None, **kwargs,
):
if not hash_info.dir_info:
hash_info.dir_info = tree.cache.get_dir_cache(hash_info)
Expand All @@ -418,20 +397,14 @@ def _save_dir(
if filter_info and not entry_info.isin_or_eq(filter_info):
continue

self._save_file(
entry_info, tree, entry_hash, save_link=False, **kwargs
)

if save_link:
self.tree.state.save_link(path_info)
if self.tree.exists(path_info):
self.tree.state.save(path_info, hi)
self._save_file(entry_info, tree, entry_hash, **kwargs)

cache_info = self.tree.hash_to_path_info(hi.value)
self.tree.state.save(cache_info, hi)
tree.state.save(path_info, hi)

@use_state
def save(self, path_info, tree, hash_info, save_link=True, **kwargs):
def save(self, path_info, tree, hash_info, **kwargs):
if path_info.scheme != self.tree.scheme:
raise RemoteActionNotImplemented(
f"save {path_info.scheme} -> {self.tree.scheme}",
Expand All @@ -447,17 +420,17 @@ def save(self, path_info, tree, hash_info, save_link=True, **kwargs):
errno.ENOENT, os.strerror(errno.ENOENT), path_info
)

self._save(path_info, tree, hash_info, save_link, **kwargs)
self._save(path_info, tree, hash_info, **kwargs)
return hash_info

def _save(self, path_info, tree, hash_info, save_link=True, **kwargs):
def _save(self, path_info, tree, hash_info, **kwargs):
to_info = self.tree.hash_to_path_info(hash_info.value)
logger.debug("Saving '%s' to '%s'.", path_info, to_info)

if tree.isdir(path_info):
self._save_dir(path_info, tree, hash_info, save_link, **kwargs)
self._save_dir(path_info, tree, hash_info, **kwargs)
else:
self._save_file(path_info, tree, hash_info, save_link, **kwargs)
self._save_file(path_info, tree, hash_info, **kwargs)

# Override to return path as a string instead of PathInfo for clouds
# which support string paths (see local)
Expand Down Expand Up @@ -542,19 +515,19 @@ def changed_cache(self, hash_info, path_info=None, filter_info=None):
)
return self.changed_cache_file(hash_info)

def already_cached(self, path_info):
current = self.tree.get_hash(path_info)
def already_cached(self, path_info, tree):
current = tree.get_hash(path_info)

if not current:
return False

return not self.changed_cache(current)

def safe_remove(self, path_info, force=False):
if not self.tree.exists(path_info):
def safe_remove(self, path_info, tree, force=False):
if not tree.exists(path_info):
return

if not force and not self.already_cached(path_info):
if not force and not self.already_cached(path_info, tree):
msg = (
"file '{}' is going to be removed."
" Are you sure you want to proceed?".format(str(path_info))
Expand All @@ -563,30 +536,48 @@ def safe_remove(self, path_info, force=False):
if not prompt.confirm(msg):
raise ConfirmRemoveError(str(path_info))

self.tree.remove(path_info)
tree.remove(path_info)

def _checkout_file(
self, path_info, hash_info, force, progress_callback=None, relink=False
self,
path_info,
tree,
hash_info,
force,
progress_callback=None,
relink=False,
):
"""The file is changed we need to checkout a new copy"""
added, modified = True, False
cache_info = self.tree.hash_to_path_info(hash_info.value)
if self.tree.exists(path_info):
logger.debug("data '%s' will be replaced.", path_info)
self.safe_remove(path_info, force=force)
added, modified = False, True

self.link(cache_info, path_info)
self.tree.state.save_link(path_info)
self.tree.state.save(path_info, hash_info)
if tree.exists(path_info):
added = False

if not relink and self.changed(path_info, tree, hash_info):
modified = True
self.safe_remove(path_info, tree, force=force)
self.link(cache_info, path_info)
else:
modified = False

if tree.iscopy(path_info) and self._cache_is_copy(path_info):
self.unprotect(path_info)
else:
self.safe_remove(path_info, tree, force=force)
self.link(cache_info, path_info)
else:
self.link(cache_info, path_info)
added, modified = True, False

tree.state.save(path_info, hash_info)
if progress_callback:
progress_callback(str(path_info))

return added, modified and not relink
return added, modified

def _checkout_dir(
self,
path_info,
tree,
hash_info,
force,
progress_callback=None,
Expand All @@ -596,7 +587,7 @@ def _checkout_dir(
added, modified = False, False
# Create dir separately so that dir is created
# even if there are no files in it
if not self.tree.exists(path_info):
if not tree.exists(path_info):
added = True
self.makedirs(path_info)

Expand All @@ -605,46 +596,45 @@ def _checkout_dir(
logger.debug("Linking directory '%s'.", path_info)

for entry_info, entry_hash_info in dir_info.items(path_info):
entry_cache_info = self.tree.hash_to_path_info(
entry_hash_info.value
)

if filter_info and not entry_info.isin_or_eq(filter_info):
continue

if relink or self.changed(entry_info, entry_hash_info):
entry_added, entry_modified = self._checkout_file(
entry_info,
tree,
entry_hash_info,
force,
progress_callback,
relink,
)
if entry_added or entry_modified:
modified = True
self.safe_remove(entry_info, force=force)
self.link(entry_cache_info, entry_info)
self.tree.state.save(entry_info, entry_hash_info)
if progress_callback:
progress_callback(str(entry_info))

modified = (
self._remove_redundant_files(path_info, dir_info, force)
self._remove_redundant_files(path_info, tree, dir_info, force)
or modified
)

self.tree.state.save_link(path_info)
self.tree.state.save(path_info, hash_info)
tree.state.save(path_info, hash_info)

# relink is not modified, assume it as nochange
return added, not added and modified and not relink

def _remove_redundant_files(self, path_info, dir_info, force):
existing_files = set(self.tree.walk_files(path_info))
def _remove_redundant_files(self, path_info, tree, dir_info, force):
existing_files = set(tree.walk_files(path_info))

needed_files = {info for info, _ in dir_info.items(path_info)}
redundant_files = existing_files - needed_files
for path in redundant_files:
self.safe_remove(path, force)
self.safe_remove(path, tree, force)

return bool(redundant_files)

@use_state
def checkout(
self,
path_info,
tree,
hash_info,
force=False,
progress_callback=None,
Expand All @@ -663,11 +653,11 @@ def checkout(
"No file hash info found for '%s'. It won't be created.",
path_info,
)
self.safe_remove(path_info, force=force)
self.safe_remove(path_info, tree, force=force)
failed = path_info

elif not relink and not self.changed(
path_info, hash_info, filter_info=filter_info
path_info, tree, hash_info, filter_info=filter_info
):
logger.trace("Data '%s' didn't change.", path_info)
skip = True
Expand All @@ -681,7 +671,7 @@ def checkout(
hash_info,
path_info,
)
self.safe_remove(path_info, force=force)
self.safe_remove(path_info, tree, force=force)
failed = path_info

if failed or skip:
Expand All @@ -702,6 +692,7 @@ def checkout(

return self._checkout(
path_info,
tree,
hash_info,
force,
progress_callback,
Expand All @@ -712,20 +703,31 @@ def checkout(
def _checkout(
self,
path_info,
tree,
hash_info,
force=False,
progress_callback=None,
relink=False,
filter_info=None,
):
if not hash_info.isdir:
return self._checkout_file(
path_info, hash_info, force, progress_callback, relink
ret = self._checkout_file(
path_info, tree, hash_info, force, progress_callback, relink
)
else:
ret = self._checkout_dir(
path_info,
tree,
hash_info,
force,
progress_callback,
relink,
filter_info,
)

return self._checkout_dir(
path_info, hash_info, force, progress_callback, relink, filter_info
)
tree.state.save_link(path_info)

return ret

def get_files_number(self, path_info, hash_info, filter_info):
from funcy.py3 import ilen
Expand Down
4 changes: 2 additions & 2 deletions dvc/cache/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def hashes_exist(
)
]

def already_cached(self, path_info):
def already_cached(self, path_info, tree):
assert path_info.scheme in ["", "local"]

return super().already_cached(path_info)
return super().already_cached(path_info, tree)

def _verify_link(self, path_info, link_type):
if link_type == "hardlink" and self.tree.getsize(path_info) == 0:
Expand Down
2 changes: 1 addition & 1 deletion dvc/dependency/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def download(self, to, jobs=None):
follow_subrepos=False,
)

cache.checkout(to.path_info, hash_info)
cache.checkout(to.path_info, to.tree, hash_info)

def update(self, rev=None):
if rev:
Expand Down
Loading

0 comments on commit 8676617

Please sign in to comment.