Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

push/pull: naively transfer run cache #3746

Merged
merged 1 commit into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions dvc/command/data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def run(self):
with_deps=self.args.with_deps,
force=self.args.force,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary(stats)
except (CheckoutError, DvcException) as exc:
Expand All @@ -54,6 +55,7 @@ def run(self):
all_commits=self.args.all_commits,
with_deps=self.args.with_deps,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary({"pushed": processed_files_count})
except DvcException:
Expand All @@ -74,6 +76,7 @@ def run(self):
all_commits=self.args.all_commits,
with_deps=self.args.with_deps,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary({"fetched": processed_files_count})
except DvcException:
Expand Down Expand Up @@ -163,6 +166,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Pull cache for subdirectories of the specified directory.",
)
pull_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
pull_parser.set_defaults(func=CmdDataPull)

# Push
Expand Down Expand Up @@ -212,6 +221,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Push cache for subdirectories of specified directory.",
)
push_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
push_parser.set_defaults(func=CmdDataPush)

# Fetch
Expand Down Expand Up @@ -267,6 +282,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Fetch cache for subdirectories of specified directory.",
)
fetch_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
fetch_parser.set_defaults(func=CmdDataFetch)

# Status
Expand Down
32 changes: 26 additions & 6 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ def get_remote(self, name=None, command="<command>"):
def _init_remote(self, name):
return Remote(self.repo, name=name)

def push(self, cache, jobs=None, remote=None, show_checksums=False):
def push(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
):
"""Push data items in a cloud-agnostic way.

Args:
Expand All @@ -58,14 +65,23 @@ def push(self, cache, jobs=None, remote=None, show_checksums=False):
show_checksums (bool): show checksums instead of file names in
information messages.
"""
remote = self.get_remote(remote, "push")

if run_cache:
self.repo.stage_cache.push(remote)

return self.repo.cache.local.push(
cache,
jobs=jobs,
remote=self.get_remote(remote, "push"),
show_checksums=show_checksums,
cache, jobs=jobs, remote=remote, show_checksums=show_checksums,
)

def pull(self, cache, jobs=None, remote=None, show_checksums=False):
def pull(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
):
"""Pull data items in a cloud-agnostic way.

Args:
Expand All @@ -77,6 +93,10 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False):
information messages.
"""
remote = self.get_remote(remote, "pull")

if run_cache:
self.repo.stage_cache.pull(remote)

downloaded_items_num = self.repo.cache.local.pull(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums
)
Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(self, root_dir=None):
self.cache = Cache(self)
self.cloud = DataCloud(self)

self.stage_cache = StageCache(self.cache.local.cache_dir)
self.stage_cache = StageCache(self)

self.metrics = Metrics(self)
self.params = Params(self)
Expand Down
7 changes: 6 additions & 1 deletion dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def _fetch(
all_tags=False,
recursive=False,
all_commits=False,
run_cache=False,
):
"""Download data items from a cloud and imported repositories

Expand Down Expand Up @@ -50,7 +51,11 @@ def _fetch(

try:
downloaded += self.cloud.pull(
used, jobs, remote=remote, show_checksums=show_checksums
used,
jobs,
remote=remote,
show_checksums=show_checksums,
run_cache=run_cache,
)
except NoRemoteError:
if not used.external and used["local"]:
Expand Down
2 changes: 2 additions & 0 deletions dvc/repo/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def pull(
force=False,
recursive=False,
all_commits=False,
run_cache=False,
):
processed_files_count = self._fetch(
targets,
Expand All @@ -27,6 +28,7 @@ def pull(
all_commits=all_commits,
with_deps=with_deps,
recursive=recursive,
run_cache=run_cache,
)
stats = self._checkout(
targets=targets, with_deps=with_deps, force=force, recursive=recursive
Expand Down
4 changes: 3 additions & 1 deletion dvc/repo/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def push(
all_tags=False,
recursive=False,
all_commits=False,
run_cache=False,
):
used = self.used_cache(
targets,
Expand All @@ -24,4 +25,5 @@ def push(
jobs=jobs,
recursive=recursive,
)
return self.cloud.push(used, jobs, remote=remote)

return self.cloud.push(used, jobs, remote=remote, run_cache=run_cache)
27 changes: 25 additions & 2 deletions dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os

import yaml
from funcy import first
from voluptuous import Invalid

from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA
Expand Down Expand Up @@ -36,8 +37,9 @@ def _get_stage_hash(stage):


class StageCache:
def __init__(self, cache_dir):
self.cache_dir = os.path.join(cache_dir, "runs")
def __init__(self, repo):
self.repo = repo
self.cache_dir = os.path.join(repo.cache.local.cache_dir, "runs")

def _get_cache_dir(self, key):
return os.path.join(self.cache_dir, key[:2], key)
Expand Down Expand Up @@ -101,3 +103,24 @@ def restore(self, stage):
if not cache:
return
StageLoader.fill_from_lock(stage, cache)

@staticmethod
def _transfer(func, from_remote, to_remote):
runs = from_remote.path_info / "runs"
if not from_remote.exists(runs):
return

for src in from_remote.walk_files(runs):
rel = src.relative_to(from_remote.path_info)
dst = to_remote.path_info / rel
key = dst.parent
# check if any build cache already exists for this key
if to_remote.exists(key) and first(to_remote.walk_files(key)):
continue
func(src, dst)

def push(self, remote):
return self._transfer(remote.upload, self.repo.cache.local, remote)

def pull(self, remote):
return self._transfer(remote.download, remote, self.repo.cache.local)
2 changes: 2 additions & 0 deletions tests/func/test_data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ def test_pipeline_file_target_ops(tmp_dir, dvc, local_remote, run_copy):

outs = ["foo", "bar", "lorem", "ipsum", "baz", "lorem2"]

remove(dvc.stage_cache.cache_dir)

dvc.push()
# each one's a copy of other, hence 3
assert len(recurse_list_dir(fspath_py35(local_remote))) == 3
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/command/test_data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_fetch(mocker):
"--all-commits",
"--with-deps",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataFetch
Expand All @@ -35,6 +36,7 @@ def test_fetch(mocker):
all_commits=True,
with_deps=True,
recursive=True,
run_cache=True,
)


Expand All @@ -54,6 +56,7 @@ def test_pull(mocker):
"--with-deps",
"--force",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataPull
Expand All @@ -73,6 +76,7 @@ def test_pull(mocker):
with_deps=True,
force=True,
recursive=True,
run_cache=True,
)


Expand All @@ -91,6 +95,7 @@ def test_push(mocker):
"--all-commits",
"--with-deps",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataPush
Expand All @@ -109,4 +114,5 @@ def test_push(mocker):
all_commits=True,
with_deps=True,
recursive=True,
run_cache=True,
)