Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote: use string paths over PathInfo for performance reasons #3672

Merged
merged 9 commits into from
Apr 28, 2020
10 changes: 7 additions & 3 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def is_dir_checksum(cls, checksum):
return checksum.endswith(cls.CHECKSUM_DIR_SUFFIX)

def get_checksum(self, path_info):
assert path_info.scheme == self.scheme
assert isinstance(path_info, str) or path_info.scheme == self.scheme

if not self.exists(path_info):
return None
Expand Down Expand Up @@ -719,6 +719,10 @@ def path_to_checksum(self, path):
def checksum_to_path_info(self, checksum):
return self.path_info / checksum[0:2] / checksum[2:]

# Return path as a string instead of PathInfo for remotes which support
# string paths (see local)
checksum_to_path = checksum_to_path_info

def list_cache_paths(self, prefix=None, progress_callback=None):
raise NotImplementedError

Expand Down Expand Up @@ -796,8 +800,8 @@ def changed_cache_file(self, checksum):

- Remove the file from cache if it doesn't match the actual checksum
"""

cache_info = self.checksum_to_path_info(checksum)
# Prefer string path over PathInfo when possible due to performance
cache_info = self.checksum_to_path(checksum)
if self.is_protected(cache_info):
logger.debug(
"Assuming '%s' is unchanged since it is read-only", cache_info
Expand Down
79 changes: 44 additions & 35 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from concurrent.futures import as_completed, ThreadPoolExecutor
from functools import partial

from funcy import concat
from funcy import cached_property, concat

from shortuuid import uuid

Expand Down Expand Up @@ -67,6 +67,13 @@ def cache_dir(self, value):
def supported(cls, config):
return True

@cached_property
def cache_path(self):
return os.path.abspath(self.cache_dir)
Copy link
Contributor Author

@pmrowla pmrowla Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If path is not abspath or relpath from current working dir, posix lstat() syscall runtime doubles. PathInfo.__str__ uses relpath from cwd, but since cwd may change during runtime (like in some of our erepo tests), we can't cache relpath(self.cache_dir) and we don't want to compute it each time, so we just use abspath.


def checksum_to_path(self, checksum):
return os.path.join(self.cache_path, checksum[0:2], checksum[2:])

def list_cache_paths(self, prefix=None, progress_callback=None):
assert self.path_info is not None
if prefix:
Expand All @@ -88,7 +95,7 @@ def get(self, md5):

def exists(self, path_info):
assert is_working_tree(self.repo.tree)
assert path_info.scheme == "local"
assert isinstance(path_info, str) or path_info.scheme == "local"
return self.repo.tree.exists(fspath_py35(path_info))

def makedirs(self, path_info):
Expand Down Expand Up @@ -148,11 +155,15 @@ def get_file_checksum(self, path_info):
return file_md5(path_info)[0]

def remove(self, path_info):
if path_info.scheme != "local":
raise NotImplementedError
if isinstance(path_info, PathInfo):
if path_info.scheme != "local":
raise NotImplementedError
path = path_info.fspath
else:
path = path_info

if self.exists(path_info):
remove(path_info.fspath)
if self.exists(path):
remove(path)

def move(self, from_info, to_info, mode=None):
if from_info.scheme != "local" or to_info.scheme != "local":
Expand Down Expand Up @@ -285,11 +296,11 @@ def _status(
show_checksums=False,
download=False,
):
"""Return a tuple of (dir_status_info, file_status_info, dir_mapping).
"""Return a tuple of (dir_status_info, file_status_info, dir_contents).

dir_status_info contains status for .dir files, file_status_info
contains status for all other files, and dir_mapping is a dict of
{dir_path_info: set(file_path_info...)} which can be used to map
contains status for all other files, and dir_contents is a dict of
{dir_checksum: set(file_checksum, ...)} which can be used to map
a .dir file to its file contents.
"""
logger.debug(
Expand Down Expand Up @@ -324,30 +335,27 @@ def _status(
)
)
return self._make_status(
named_cache, remote, show_checksums, local_exists, remote_exists
named_cache, show_checksums, local_exists, remote_exists
)

def _make_status(
self, named_cache, remote, show_checksums, local_exists, remote_exists
self, named_cache, show_checksums, local_exists, remote_exists
):
def make_names(checksum, names):
return {"name": checksum if show_checksums else " ".join(names)}

dir_status = {}
file_status = {}
dir_paths = {}
dir_contents = {}
for checksum, item in named_cache[self.scheme].items():
if item.children:
dir_status[checksum] = make_names(checksum, item.names)
file_status.update(
{
child_checksum: make_names(child_checksum, child.names)
for child_checksum, child in item.children.items()
}
)
dir_paths[remote.checksum_to_path_info(checksum)] = frozenset(
map(remote.checksum_to_path_info, item.child_keys())
)
dir_contents[checksum] = set()
for child_checksum, child in item.children.items():
file_status[child_checksum] = make_names(
child_checksum, child.names
)
dir_contents[checksum].add(child_checksum)
else:
file_status[checksum] = make_names(checksum, item.names)

Expand All @@ -356,7 +364,7 @@ def make_names(checksum, names):

self._log_missing_caches(dict(dir_status, **file_status))

return dir_status, file_status, dir_paths
return dir_status, file_status, dir_contents

def _indexed_dir_checksums(self, named_cache, remote, dir_md5s):
# Validate our index by verifying all indexed .dir checksums
Expand Down Expand Up @@ -409,13 +417,15 @@ def _get_plans(self, download, remote, status_info, status):
cache = []
path_infos = []
names = []
checksums = []
for md5, info in Tqdm(
status_info.items(), desc="Analysing status", unit="file"
):
if info["status"] == status:
cache.append(self.checksum_to_path_info(md5))
path_infos.append(remote.checksum_to_path_info(md5))
names.append(info["name"])
checksums.append(md5)

if download:
to_infos = cache
Expand All @@ -424,7 +434,7 @@ def _get_plans(self, download, remote, status_info, status):
to_infos = path_infos
from_infos = cache

return from_infos, to_infos, names
return from_infos, to_infos, names, checksums

def _process(
self,
Expand Down Expand Up @@ -457,7 +467,7 @@ def _process(
if jobs is None:
jobs = remote.JOBS

dir_status, file_status, dir_paths = self._status(
dir_status, file_status, dir_contents = self._status(
named_cache,
remote,
jobs=jobs,
Expand All @@ -482,18 +492,20 @@ def _process(
# for uploads, push files first, and any .dir files last

file_futures = {}
for from_info, to_info, name in zip(*file_plans):
file_futures[to_info] = executor.submit(
for from_info, to_info, name, checksum in zip(*file_plans):
file_futures[checksum] = executor.submit(
func, from_info, to_info, name
)
dir_futures = {}
for from_info, to_info, name in zip(*dir_plans):
for from_info, to_info, name, dir_checksum in zip(
*dir_plans
):
wait_futures = {
future
for file_path, future in file_futures.items()
if file_path in dir_paths[to_info]
for file_checksum, future in file_futures.items()
if file_checksum in dir_contents[dir_checksum]
}
dir_futures[to_info] = executor.submit(
dir_futures[dir_checksum] = executor.submit(
self._dir_upload,
func,
wait_futures,
Expand All @@ -516,12 +528,9 @@ def _process(

if not download:
# index successfully pushed dirs
for to_info, future in dir_futures.items():
for dir_checksum, future in dir_futures.items():
if future.result() == 0:
dir_checksum = remote.path_to_checksum(str(to_info))
file_checksums = list(
named_cache.child_keys(self.scheme, dir_checksum)
)
file_checksums = dir_contents[dir_checksum]
logger.debug(
"Indexing pushed dir '{}' with "
"'{}' nested files".format(
Expand Down
6 changes: 3 additions & 3 deletions dvc/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def save(self, path_info, checksum):
path_info (dict): path_info to save checksum for.
checksum (str): checksum to save.
"""
assert path_info.scheme == "local"
assert isinstance(path_info, str) or path_info.scheme == "local"
assert checksum is not None
assert os.path.exists(fspath_py35(path_info))

Expand Down Expand Up @@ -398,7 +398,7 @@ def get(self, path_info):
str or None: checksum for the specified path info or None if it
doesn't exist in the state database.
"""
assert path_info.scheme == "local"
assert isinstance(path_info, str) or path_info.scheme == "local"
path = fspath_py35(path_info)

if not os.path.exists(path):
Expand All @@ -425,7 +425,7 @@ def save_link(self, path_info):
Args:
path_info (dict): path info to add to the list of links.
"""
assert path_info.scheme == "local"
assert isinstance(path_info, str) or path_info.scheme == "local"

if not os.path.exists(fspath_py35(path_info)):
return
Expand Down