Skip to content

Commit

Permalink
push/pull: naively transfer run cache
Browse files Browse the repository at this point in the history
This is the most simple (aka dumb) implementation for this
functionality that is needed to unblock CICD development.
  • Loading branch information
efiop committed May 7, 2020
1 parent 6a406ab commit bf89073
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 11 deletions.
21 changes: 21 additions & 0 deletions dvc/command/data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def run(self):
with_deps=self.args.with_deps,
force=self.args.force,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary(stats)
except (CheckoutError, DvcException) as exc:
Expand All @@ -54,6 +55,7 @@ def run(self):
all_commits=self.args.all_commits,
with_deps=self.args.with_deps,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary({"pushed": processed_files_count})
except DvcException:
Expand All @@ -74,6 +76,7 @@ def run(self):
all_commits=self.args.all_commits,
with_deps=self.args.with_deps,
recursive=self.args.recursive,
run_cache=self.args.run_cache,
)
self.log_summary({"fetched": processed_files_count})
except DvcException:
Expand Down Expand Up @@ -163,6 +166,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Pull cache for subdirectories of the specified directory.",
)
pull_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
pull_parser.set_defaults(func=CmdDataPull)

# Push
Expand Down Expand Up @@ -212,6 +221,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Push cache for subdirectories of specified directory.",
)
push_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
push_parser.set_defaults(func=CmdDataPush)

# Fetch
Expand Down Expand Up @@ -267,6 +282,12 @@ def add_parser(subparsers, _parent_parser):
default=False,
help="Fetch cache for subdirectories of specified directory.",
)
fetch_parser.add_argument(
"--run-cache",
action="store_true",
default=False,
help=argparse.SUPPRESS,
)
fetch_parser.set_defaults(func=CmdDataFetch)

# Status
Expand Down
32 changes: 26 additions & 6 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ def get_remote(self, name=None, command="<command>"):
def _init_remote(self, name):
return Remote(self.repo, name=name)

def push(self, cache, jobs=None, remote=None, show_checksums=False):
def push(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
):
"""Push data items in a cloud-agnostic way.
Args:
Expand All @@ -58,14 +65,23 @@ def push(self, cache, jobs=None, remote=None, show_checksums=False):
show_checksums (bool): show checksums instead of file names in
information messages.
"""
remote = self.get_remote(remote, "push")

if run_cache:
self.repo.stage_cache.push(remote)

return self.repo.cache.local.push(
cache,
jobs=jobs,
remote=self.get_remote(remote, "push"),
show_checksums=show_checksums,
cache, jobs=jobs, remote=remote, show_checksums=show_checksums,
)

def pull(self, cache, jobs=None, remote=None, show_checksums=False):
def pull(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
):
"""Pull data items in a cloud-agnostic way.
Args:
Expand All @@ -77,6 +93,10 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False):
information messages.
"""
remote = self.get_remote(remote, "pull")

if run_cache:
self.repo.stage_cache.pull(remote)

downloaded_items_num = self.repo.cache.local.pull(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums
)
Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(self, root_dir=None):
self.cache = Cache(self)
self.cloud = DataCloud(self)

self.stage_cache = StageCache(self.cache.local.cache_dir)
self.stage_cache = StageCache(self)

self.metrics = Metrics(self)
self.params = Params(self)
Expand Down
7 changes: 6 additions & 1 deletion dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def _fetch(
all_tags=False,
recursive=False,
all_commits=False,
run_cache=False,
):
"""Download data items from a cloud and imported repositories
Expand Down Expand Up @@ -50,7 +51,11 @@ def _fetch(

try:
downloaded += self.cloud.pull(
used, jobs, remote=remote, show_checksums=show_checksums
used,
jobs,
remote=remote,
show_checksums=show_checksums,
run_cache=run_cache,
)
except NoRemoteError:
if not used.external and used["local"]:
Expand Down
2 changes: 2 additions & 0 deletions dvc/repo/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def pull(
force=False,
recursive=False,
all_commits=False,
run_cache=False,
):
processed_files_count = self._fetch(
targets,
Expand All @@ -27,6 +28,7 @@ def pull(
all_commits=all_commits,
with_deps=with_deps,
recursive=recursive,
run_cache=run_cache,
)
stats = self._checkout(
targets=targets, with_deps=with_deps, force=force, recursive=recursive
Expand Down
4 changes: 3 additions & 1 deletion dvc/repo/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def push(
all_tags=False,
recursive=False,
all_commits=False,
run_cache=False,
):
used = self.used_cache(
targets,
Expand All @@ -24,4 +25,5 @@ def push(
jobs=jobs,
recursive=recursive,
)
return self.cloud.push(used, jobs, remote=remote)

return self.cloud.push(used, jobs, remote=remote, run_cache=run_cache)
27 changes: 25 additions & 2 deletions dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os

import yaml
from funcy import first
from voluptuous import Invalid

from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA
Expand Down Expand Up @@ -36,8 +37,9 @@ def _get_stage_hash(stage):


class StageCache:
def __init__(self, cache_dir):
self.cache_dir = os.path.join(cache_dir, "runs")
def __init__(self, repo):
self.repo = repo
self.cache_dir = os.path.join(repo.cache.local.cache_dir, "runs")

def _get_cache_dir(self, key):
return os.path.join(self.cache_dir, key[:2], key)
Expand Down Expand Up @@ -101,3 +103,24 @@ def restore(self, stage):
if not cache:
return
StageLoader.fill_from_lock(stage, cache)

@staticmethod
def _transfer(func, from_remote, to_remote):
runs = from_remote.path_info / "runs"
if not from_remote.exists(runs):
return

for src in from_remote.walk_files(runs):
rel = src.relative_to(from_remote.path_info)
dst = to_remote.path_info / rel
key = dst.parent
# check if any build cache already exists for this key
if to_remote.exists(key) and first(to_remote.walk_files(key)):
continue
func(src, dst)

def push(self, remote):
return self._transfer(remote.upload, self.repo.cache.local, remote)

def pull(self, remote):
return self._transfer(remote.download, remote, self.repo.cache.local)
2 changes: 2 additions & 0 deletions tests/func/test_data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ def test_pipeline_file_target_ops(tmp_dir, dvc, local_remote, run_copy):

outs = ["foo", "bar", "lorem", "ipsum", "baz", "lorem2"]

remove(dvc.stage_cache.cache_dir)

dvc.push()
# each one's a copy of other, hence 3
assert len(recurse_list_dir(fspath_py35(local_remote))) == 3
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/command/test_data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_fetch(mocker):
"--all-commits",
"--with-deps",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataFetch
Expand All @@ -35,6 +36,7 @@ def test_fetch(mocker):
all_commits=True,
with_deps=True,
recursive=True,
run_cache=True,
)


Expand All @@ -54,6 +56,7 @@ def test_pull(mocker):
"--with-deps",
"--force",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataPull
Expand All @@ -73,6 +76,7 @@ def test_pull(mocker):
with_deps=True,
force=True,
recursive=True,
run_cache=True,
)


Expand All @@ -91,6 +95,7 @@ def test_push(mocker):
"--all-commits",
"--with-deps",
"--recursive",
"--run-cache",
]
)
assert cli_args.func == CmdDataPush
Expand All @@ -109,4 +114,5 @@ def test_push(mocker):
all_commits=True,
with_deps=True,
recursive=True,
run_cache=True,
)

0 comments on commit bf89073

Please sign in to comment.