Skip to content

Commit

Permalink
push/pull: naively transfer run cache (#3746)
Browse files Browse the repository at this point in the history
This is the most simple (aka dumb) implementation for this
functionality that is needed to unblock CICD development.
  • Loading branch information
efiop authored May 7, 2020
1 parent 9714b14 commit 785dcfe
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 11 deletions.
21 changes: 21 additions & 0 deletions dvc/command/data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def run(self):
with_deps=self.args.with_deps,
force=self.args.force,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary(stats)
except (CheckoutError, DvcException) as exc:
Expand All @@ -54,6 +55,7 @@ def run(self):
all_commits=self.args.all_commits,
with_deps=self.args.with_deps,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary({"pushed": processed_files_count})
except DvcException:
Expand All @@ -74,6 +76,7 @@ def run(self):
all_commits=self.args.all_commits,
with_deps=self.args.with_deps,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary({"fetched": processed_files_count})
except DvcException:
Expand Down Expand Up @@ -163,6 +166,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Pull cache for subdirectories of the specified directory.",
)
pull_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
pull_parser.set_defaults(func=CmdDataPull)

# Push
Expand Down Expand Up @@ -212,6 +221,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Push cache for subdirectories of specified directory.",
)
push_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
push_parser.set_defaults(func=CmdDataPush)

# Fetch
Expand Down Expand Up @@ -267,6 +282,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Fetch cache for subdirectories of specified directory.",
)
fetch_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
fetch_parser.set_defaults(func=CmdDataFetch)

# Status
Expand Down
32 changes: 26 additions & 6 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ def get_remote(self, name=None, command="<command>"):
def _init_remote(self, name):
return Remote(self.repo, name=name)

def push(self, cache, jobs=None, remote=None, show_checksums=False):
def push(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
):
"""Push data items in a cloud-agnostic way.
Args:
Expand All @@ -58,14 +65,23 @@ def push(self, cache, jobs=None, remote=None, show_checksums=False):
show_checksums (bool): show checksums instead of file names in
information messages.
"""
remote = self.get_remote(remote, "push")

if run_cache:
self.repo.stage_cache.push(remote)

return self.repo.cache.local.push(
cache,
jobs=jobs,
remote=self.get_remote(remote, "push"),
show_checksums=show_checksums,
cache, jobs=jobs, remote=remote, show_checksums=show_checksums,
)

def pull(self, cache, jobs=None, remote=None, show_checksums=False):
def pull(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
):
"""Pull data items in a cloud-agnostic way.
Args:
Expand All @@ -77,6 +93,10 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False):
information messages.
"""
remote = self.get_remote(remote, "pull")

if run_cache:
self.repo.stage_cache.pull(remote)

downloaded_items_num = self.repo.cache.local.pull(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums
)
Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(self, root_dir=None):
self.cache = Cache(self)
self.cloud = DataCloud(self)

self.stage_cache = StageCache(self.cache.local.cache_dir)
self.stage_cache = StageCache(self)

self.metrics = Metrics(self)
self.params = Params(self)
Expand Down
7 changes: 6 additions & 1 deletion dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def _fetch(
all_tags=False,
recursive=False,
all_commits=False,
run_cache=False,
):
"""Download data items from a cloud and imported repositories
Expand Down Expand Up @@ -50,7 +51,11 @@ def _fetch(

try:
downloaded += self.cloud.pull(
used, jobs, remote=remote, show_checksums=show_checksums
used,
jobs,
remote=remote,
show_checksums=show_checksums,
run_cache=run_cache,
)
except NoRemoteError:
if not used.external and used["local"]:
Expand Down
2 changes: 2 additions & 0 deletions dvc/repo/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def pull(
force=False,
recursive=False,
all_commits=False,
run_cache=False,
):
processed_files_count = self._fetch(
targets,
Expand All @@ -27,6 +28,7 @@ def pull(
all_commits=all_commits,
with_deps=with_deps,
recursive=recursive,
run_cache=run_cache,
)
stats = self._checkout(
targets=targets, with_deps=with_deps, force=force, recursive=recursive
Expand Down
4 changes: 3 additions & 1 deletion dvc/repo/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def push(
all_tags=False,
recursive=False,
all_commits=False,
run_cache=False,
):
used = self.used_cache(
targets,
Expand All @@ -24,4 +25,5 @@ def push(
jobs=jobs,
recursive=recursive,
)
return self.cloud.push(used, jobs, remote=remote)

return self.cloud.push(used, jobs, remote=remote, run_cache=run_cache)
27 changes: 25 additions & 2 deletions dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os

import yaml
from funcy import first
from voluptuous import Invalid

from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA
Expand Down Expand Up @@ -36,8 +37,9 @@ def _get_stage_hash(stage):


class StageCache:
def __init__(self, cache_dir):
self.cache_dir = os.path.join(cache_dir, "runs")
def __init__(self, repo):
self.repo = repo
self.cache_dir = os.path.join(repo.cache.local.cache_dir, "runs")

def _get_cache_dir(self, key):
return os.path.join(self.cache_dir, key[:2], key)
Expand Down Expand Up @@ -101,3 +103,24 @@ def restore(self, stage):
if not cache:
return
StageLoader.fill_from_lock(stage, cache)

@staticmethod
def _transfer(func, from_remote, to_remote):
runs = from_remote.path_info / "runs"
if not from_remote.exists(runs):
return

for src in from_remote.walk_files(runs):
rel = src.relative_to(from_remote.path_info)
dst = to_remote.path_info / rel
key = dst.parent
# check if any build cache already exists for this key
if to_remote.exists(key) and first(to_remote.walk_files(key)):
continue
func(src, dst)

def push(self, remote):
return self._transfer(remote.upload, self.repo.cache.local, remote)

def pull(self, remote):
return self._transfer(remote.download, remote, self.repo.cache.local)
2 changes: 2 additions & 0 deletions tests/func/test_data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ def test_pipeline_file_target_ops(tmp_dir, dvc, local_remote, run_copy):

outs = ["foo", "bar", "lorem", "ipsum", "baz", "lorem2"]

remove(dvc.stage_cache.cache_dir)

dvc.push()
# each one's a copy of other, hence 3
assert len(recurse_list_dir(fspath_py35(local_remote))) == 3
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/command/test_data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_fetch(mocker):
"--all-commits",
"--with-deps",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataFetch
Expand All @@ -35,6 +36,7 @@ def test_fetch(mocker):
all_commits=True,
with_deps=True,
recursive=True,
run_cache=True,
)


Expand All @@ -54,6 +56,7 @@ def test_pull(mocker):
"--with-deps",
"--force",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataPull
Expand All @@ -73,6 +76,7 @@ def test_pull(mocker):
with_deps=True,
force=True,
recursive=True,
run_cache=True,
)


Expand All @@ -91,6 +95,7 @@ def test_push(mocker):
"--all-commits",
"--with-deps",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataPush
Expand All @@ -109,4 +114,5 @@ def test_push(mocker):
all_commits=True,
with_deps=True,
recursive=True,
run_cache=True,
)

0 comments on commit 785dcfe

Please sign in to comment.