Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote: support saving RepoTree objects directly to cache #3825

Merged
merged 17 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,17 @@ def unprotect(self):
if self.exists:
self.remote.unprotect(self.path_info)

def get_dir_cache(self, **kwargs):
if not self.is_dir_checksum:
raise DvcException("cannot get dir cache for file checksum")
if self.cache.changed_cache_file(self.checksum):
self.repo.cloud.pull(
NamedCache.make("local", self.checksum, str(self)),
show_checksums=False,
**kwargs,
)
return self.dir_cache

def collect_used_dir_cache(
self, remote=None, force=False, jobs=None, filter_info=None
):
Expand All @@ -371,16 +382,10 @@ def collect_used_dir_cache(

cache = NamedCache()

if self.cache.changed_cache_file(self.checksum):
try:
self.repo.cloud.pull(
NamedCache.make("local", self.checksum, str(self)),
jobs=jobs,
remote=remote,
show_checksums=False,
)
except DvcException:
logger.debug(f"failed to pull cache for '{self}'")
try:
self.get_dir_cache(jobs=jobs, remote=remote)
except DvcException:
logger.debug(f"failed to pull cache for '{self}'")

if self.cache.changed_cache_file(self.checksum):
msg = (
Expand Down
117 changes: 83 additions & 34 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dvc.progress import Tqdm
from dvc.remote.index import RemoteIndex, RemoteIndexNoop
from dvc.remote.slow_link_detection import slow_link_guard
from dvc.scm.tree import is_working_tree
from dvc.state import StateNoop
from dvc.utils import tmp_fname
from dvc.utils.fs import makedirs, move
Expand Down Expand Up @@ -253,13 +254,17 @@ def get_dir_checksum(self, path_info):
raise RemoteCacheRequiredError(path_info)

dir_info = self._collect_dir(path_info)
return self._save_dir_info(dir_info, path_info)

def _save_dir_info(self, dir_info, path_info=None):
checksum, tmp_info = self._get_dir_info_checksum(dir_info)
new_info = self.cache.checksum_to_path_info(checksum)
if self.cache.changed_cache_file(checksum):
self.cache.makedirs(new_info.parent)
self.cache.move(tmp_info, new_info, mode=self.CACHE_MODE)

self.state.save(path_info, checksum)
if path_info:
self.state.save(path_info, checksum)
self.state.save(new_info, checksum)

return checksum
Expand Down Expand Up @@ -452,27 +457,33 @@ def _do_link(self, from_info, to_info, link_method):
"Created '%s': %s -> %s", self.cache_types[0], from_info, to_info,
)

def _save_file(self, path_info, checksum, save_link=True):
def _save_file(self, path_info, checksum, save_link=True, tree=None):
assert checksum

cache_info = self.checksum_to_path_info(checksum)
if self.changed_cache(checksum):
self.move(path_info, cache_info, mode=self.CACHE_MODE)
self.link(cache_info, path_info)
elif self.iscopy(path_info) and self._cache_is_copy(path_info):
# Default relink procedure involves unneeded copy
self.unprotect(path_info)
if tree:
if self.changed_cache(checksum):
with tree.open(path_info, mode="rb") as fobj:
self.copy_fobj(fobj, cache_info)
else:
self.remove(path_info)
self.link(cache_info, path_info)
if self.changed_cache(checksum):
self.move(path_info, cache_info, mode=self.CACHE_MODE)
self.link(cache_info, path_info)
elif self.iscopy(path_info) and self._cache_is_copy(path_info):
# Default relink procedure involves unneeded copy
self.unprotect(path_info)
else:
self.remove(path_info)
self.link(cache_info, path_info)
Comment on lines +469 to +477
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also think about making save() only save without checking out(these links are effectively checkout). This if&else makes it clear that this method tries to do too much. But at the same time move is a very nice optimization for big files that allows us to make it happen instantly if we are within the same fs.

I would be fine with keeping it as is for now, but let's at least keep this in mind.


if save_link:
self.state.save_link(path_info)
if save_link:
self.state.save_link(path_info)

# we need to update path and cache, since in case of reflink,
# or copy cache type moving original file results in updates on
# next executed command, which causes md5 recalculation
self.state.save(path_info, checksum)
if not tree or is_working_tree(tree):
self.state.save(path_info, checksum)
self.state.save(cache_info, checksum)

def _cache_is_copy(self, path_info):
Expand All @@ -497,22 +508,43 @@ def _cache_is_copy(self, path_info):
self.cache_type_confirmed = True
return self.cache_types[0] == "copy"

def _save_dir(self, path_info, checksum, save_link=True):
cache_info = self.checksum_to_path_info(checksum)
dir_info = self.get_dir_cache(checksum)
def _save_dir(self, path_info, checksum, save_link=True, tree=None):
if tree:
checksum = self._save_tree(path_info, tree)
else:
dir_info = self.get_dir_cache(checksum)
Comment on lines +512 to +515
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this before, but just to clarify: I suppose we need this because we are not ready to make _collect_dir use the tree, just yet, right? Because it uses self.walk_files, which raises another question of abstracting tree-like Remote methods somehow (maybe in RemoteTree's or something)?

Copy link
Contributor Author

@pmrowla pmrowla May 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can add tree support in _collect_dir, but it made the workflow look a bit strange to me, since _collect_dir is used in get_dir_checksum. If you go off of the current behavior for fetching external git files, we would do something along the lines of:

# get a dir checksum for the git dir via `get_dir_checksum()`/`_collect_dir()`/`tree.walk_files()`
dir_cache_info = cache.save_info(path, tree=tree)
# walk the tree again and save git file objects from tree
cache.save(path, dir_cache_info, tree=tree)

But walking the tree twice like this seems unnecessary for this use case, since we can save objects from the tree and collect the dir cache info at the same time during a single walk, and I wasn't sure if saving objects from tree during the walk would be desired behavior for _collect_dir (because saving objects in a get_dir_checksum call definitely seems like non-desired behavior).

Thinking about it now, what we could do is add the tree param to _collect_dir, and if it's called with a tree then we just save the objects from tree during walk_files, and use _collect_dir here in place of _save_tree. And when _collect_dir is used w/o a tree param, it doesn't save any objects during walk_files and behaves the same way as before for get_dir_checksum calls.


for entry in Tqdm(
dir_info, desc="Saving " + path_info.name, unit="file"
):
entry_info = path_info / entry[self.PARAM_RELPATH]
entry_checksum = entry[self.PARAM_CHECKSUM]
self._save_file(entry_info, entry_checksum, save_link=False)
for entry in Tqdm(
dir_info, desc="Saving " + path_info.name, unit="file"
):
entry_info = path_info / entry[self.PARAM_RELPATH]
entry_checksum = entry[self.PARAM_CHECKSUM]
self._save_file(entry_info, entry_checksum, save_link=False)

if save_link:
self.state.save_link(path_info)
if save_link:
self.state.save_link(path_info)

cache_info = self.checksum_to_path_info(checksum)
self.state.save(cache_info, checksum)
self.state.save(path_info, checksum)
if not tree or is_working_tree(tree):
self.state.save(path_info, checksum)

def _save_tree(self, path_info, tree):
# save tree directory to cache, collect dir cache during walk and
# return the resulting dir checksum
dir_info = []
for fname in tree.walk_files(path_info):
checksum = tree.get_file_checksum(fname)
file_info = {
self.PARAM_CHECKSUM: checksum,
self.PARAM_RELPATH: fname.relative_to(path_info).as_posix(),
}
self._save_file(fname, checksum, tree=tree)
dir_info.append(file_info)
Comment on lines +536 to +543
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual behavior for this walk depends on how tree was initialized.

Git files will always be saved directly from the tree in _save_file() via tree.open()
For DVC outs:

  • If fetch=False and stream=False, walk will not recurse into DVC out directories
  • If fetch=True, DVC outs will be fetched when walk recurses into the out directory
    • _save_file will skip the tree.open()->copy for fetched outs (since they will already exist in cache)
  • If stream=True, DVC outs will be saved by streaming in _save_file() via tree.open()

Basically, for erepo's and import/get, we probably always want the behavior from fetch=True.


return self._save_dir_info(
sorted(dir_info, key=itemgetter(self.PARAM_RELPATH))
)

def is_empty(self, path_info):
return False
Expand Down Expand Up @@ -541,22 +573,36 @@ def walk_files(self, path_info):
def protect(path_info):
pass

def save(self, path_info, checksum_info, save_link=True):
def save(self, path_info, checksum_info, save_link=True, tree=None):
if path_info.scheme != self.scheme:
raise RemoteActionNotImplemented(
f"save {path_info.scheme} -> {self.scheme}", self.scheme,
)

checksum = checksum_info[self.PARAM_CHECKSUM]
self._save(path_info, checksum, save_link)
if tree:
# save checksum will be computed during tree walk
checksum = None
else:
checksum = checksum_info[self.PARAM_CHECKSUM]
self._save(path_info, checksum, save_link, tree)

def _save(self, path_info, checksum, save_link=True, tree=None):
if tree:
logger.debug("Saving tree path '%s' to cache.", path_info)
else:
to_info = self.checksum_to_path_info(checksum)
logger.debug("Saving '%s' to '%s'.", path_info, to_info)

def _save(self, path_info, checksum, save_link=True):
to_info = self.checksum_to_path_info(checksum)
logger.debug("Saving '%s' to '%s'.", path_info, to_info)
if self.isdir(path_info):
self._save_dir(path_info, checksum, save_link)
if tree:
isdir = tree.isdir
save_link = False
else:
isdir = self.isdir
Comment on lines +596 to +600
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how RemoteTree(or something like that) becomes more and more apparent πŸ˜„

And that the state should probably belong to the tree. So here it would be a noop for git tree.

Not saying we should implement all of that right now, just noticing the things that we've discussed earlier πŸ™‚


if isdir(path_info):
self._save_dir(path_info, checksum, save_link, tree)
return
self._save_file(path_info, checksum, save_link)
self._save_file(path_info, checksum, save_link, tree)

def _handle_transfer_exception(
self, from_info, to_info, exception, operation
Expand Down Expand Up @@ -695,6 +741,9 @@ def move(self, from_info, to_info, mode=None):
def copy(self, from_info, to_info):
raise RemoteActionNotImplemented("copy", self.scheme)

def copy_fobj(self, fobj, to_info):
raise RemoteActionNotImplemented("copy_fobj", self.scheme)

def symlink(self, from_info, to_info):
raise RemoteActionNotImplemented("symlink", self.scheme)

Expand Down
60 changes: 51 additions & 9 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@
)
from dvc.remote.index import RemoteIndexNoop
from dvc.scheme import Schemes
from dvc.scm.tree import is_working_tree
from dvc.scm.tree import WorkingTree, is_working_tree
from dvc.system import System
from dvc.utils import file_md5, relpath, tmp_fname
from dvc.utils.fs import copyfile, makedirs, move, remove, walk_files
from dvc.utils.fs import (
copy_fobj_to_file,
copyfile,
makedirs,
move,
remove,
walk_files,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,6 +68,21 @@ def cache_dir(self):
def cache_dir(self, value):
self.path_info = PathInfo(value) if value else None

@cached_property
def _work_tree(self):
if self.repo:
return WorkingTree(self.repo.root_dir)
return None

@property
def work_tree(self):
# When using repo.brancher, repo.tree may change to/from WorkingTree to
# GitTree arbitarily. When repo.tree is GitTree, local cache needs to
# use its own WorkingTree instance.
if self.repo and not is_working_tree(self.repo.tree):
return self._work_tree
return None

@classmethod
def supported(cls, config):
return True
Expand Down Expand Up @@ -92,8 +114,11 @@ def get(self, md5):
return self.checksum_to_path_info(md5).url

def exists(self, path_info):
assert is_working_tree(self.repo.tree)
assert isinstance(path_info, str) or path_info.scheme == "local"
if not self.repo:
return os.path.exists(path_info)
if self.work_tree and self.work_tree.exists(path_info):
return True
return self.repo.tree.exists(path_info)

def makedirs(self, path_info):
Expand Down Expand Up @@ -126,13 +151,19 @@ def is_empty(self, path_info):

return False

@staticmethod
def isfile(path_info):
return os.path.isfile(path_info)
def isfile(self, path_info):
if not self.repo:
return os.path.isfile(path_info)
if self.work_tree and self.work_tree.isfile(path_info):
return True
return self.repo.tree.isfile(path_info)

@staticmethod
def isdir(path_info):
return os.path.isdir(path_info)
def isdir(self, path_info):
if not self.repo:
return os.path.isdir(path_info)
if self.work_tree and self.work_tree.isdir(path_info):
return True
return self.repo.tree.isdir(path_info)

def iscopy(self, path_info):
return not (
Expand Down Expand Up @@ -187,6 +218,17 @@ def copy(self, from_info, to_info):
self.remove(tmp_info)
raise

def copy_fobj(self, fobj, to_info):
self.makedirs(to_info.parent)
tmp_info = to_info.parent / tmp_fname(to_info.name)
try:
copy_fobj_to_file(fobj, tmp_info)
os.chmod(tmp_info, self._file_mode)
os.rename(tmp_info, to_info)
except Exception:
self.remove(tmp_info)
raise

@staticmethod
def symlink(from_info, to_info):
System.symlink(from_info, to_info)
Expand Down
17 changes: 9 additions & 8 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,17 +488,18 @@ def open_by_relpath(self, path, remote=None, mode="r", encoding=None):
tree = RepoTree(self, stream=True)
path = os.path.join(self.root_dir, path)
try:
with tree.open(
os.path.join(self.root_dir, path),
mode=mode,
encoding=encoding,
remote=remote,
) as fobj:
yield fobj
with self.state:
with tree.open(
os.path.join(self.root_dir, path),
mode=mode,
encoding=encoding,
remote=remote,
) as fobj:
yield fobj
except FileNotFoundError as exc:
raise FileMissingError(path) from exc
except IsADirectoryError as exc:
raise DvcIsADirectoryError from exc
raise DvcIsADirectoryError(f"'{path}' is a directory") from exc

def close(self):
self.scm.close()
Expand Down
Loading