From 785dcfeef0dd7776677ac1159a37ac5599bc2fc3 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Thu, 7 May 2020 16:55:10 +0300 Subject: [PATCH] push/pull: naively transfer run cache (#3746) This is the most simple (aka dumb) implementation for this functionality that is needed to unblock CICD development. --- dvc/command/data_sync.py | 21 ++++++++++++++++++ dvc/data_cloud.py | 32 ++++++++++++++++++++++------ dvc/repo/__init__.py | 2 +- dvc/repo/fetch.py | 7 +++++- dvc/repo/pull.py | 2 ++ dvc/repo/push.py | 4 +++- dvc/stage/cache.py | 27 +++++++++++++++++++++-- tests/func/test_data_cloud.py | 2 ++ tests/unit/command/test_data_sync.py | 6 ++++++ 9 files changed, 92 insertions(+), 11 deletions(-) diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index 0d5d541118..baee9343ae 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -32,6 +32,7 @@ def run(self): with_deps=self.args.with_deps, force=self.args.force, recursive=self.args.recursive, + run_cache=self.args.run_cache, ) self.log_summary(stats) except (CheckoutError, DvcException) as exc: @@ -54,6 +55,7 @@ def run(self): all_commits=self.args.all_commits, with_deps=self.args.with_deps, recursive=self.args.recursive, + run_cache=self.args.run_cache, ) self.log_summary({"pushed": processed_files_count}) except DvcException: @@ -74,6 +76,7 @@ def run(self): all_commits=self.args.all_commits, with_deps=self.args.with_deps, recursive=self.args.recursive, + run_cache=self.args.run_cache, ) self.log_summary({"fetched": processed_files_count}) except DvcException: @@ -163,6 +166,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Pull cache for subdirectories of the specified directory.", ) + pull_parser.add_argument( + "--run-cache", + action="store_true", + default=False, + help=argparse.SUPPRESS, + ) pull_parser.set_defaults(func=CmdDataPull) # Push @@ -212,6 +221,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Push cache for subdirectories of specified directory.", ) + push_parser.add_argument( + "--run-cache", + action="store_true", + default=False, + help=argparse.SUPPRESS, + ) push_parser.set_defaults(func=CmdDataPush) # Fetch @@ -267,6 +282,12 @@ def add_parser(subparsers, _parent_parser): default=False, help="Fetch cache for subdirectories of specified directory.", ) + fetch_parser.add_argument( + "--run-cache", + action="store_true", + default=False, + help=argparse.SUPPRESS, + ) fetch_parser.set_defaults(func=CmdDataFetch) # Status diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index 185e96ae7b..0fceabb4aa 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -47,7 +47,14 @@ def get_remote(self, name=None, command=""): def _init_remote(self, name): return Remote(self.repo, name=name) - def push(self, cache, jobs=None, remote=None, show_checksums=False): + def push( + self, + cache, + jobs=None, + remote=None, + show_checksums=False, + run_cache=False, + ): """Push data items in a cloud-agnostic way. Args: @@ -58,14 +65,23 @@ def push(self, cache, jobs=None, remote=None, show_checksums=False): show_checksums (bool): show checksums instead of file names in information messages. """ + remote = self.get_remote(remote, "push") + + if run_cache: + self.repo.stage_cache.push(remote) + return self.repo.cache.local.push( - cache, - jobs=jobs, - remote=self.get_remote(remote, "push"), - show_checksums=show_checksums, + cache, jobs=jobs, remote=remote, show_checksums=show_checksums, ) - def pull(self, cache, jobs=None, remote=None, show_checksums=False): + def pull( + self, + cache, + jobs=None, + remote=None, + show_checksums=False, + run_cache=False, + ): """Pull data items in a cloud-agnostic way. Args: @@ -77,6 +93,10 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False): information messages. """ remote = self.get_remote(remote, "pull") + + if run_cache: + self.repo.stage_cache.pull(remote) + downloaded_items_num = self.repo.cache.local.pull( cache, jobs=jobs, remote=remote, show_checksums=show_checksums ) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index bfcab6bc99..166cc13af7 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -106,7 +106,7 @@ def __init__(self, root_dir=None): self.cache = Cache(self) self.cloud = DataCloud(self) - self.stage_cache = StageCache(self.cache.local.cache_dir) + self.stage_cache = StageCache(self) self.metrics = Metrics(self) self.params = Params(self) diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index a06efb5a59..efbd93beba 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -20,6 +20,7 @@ def _fetch( all_tags=False, recursive=False, all_commits=False, + run_cache=False, ): """Download data items from a cloud and imported repositories @@ -50,7 +51,11 @@ def _fetch( try: downloaded += self.cloud.pull( - used, jobs, remote=remote, show_checksums=show_checksums + used, + jobs, + remote=remote, + show_checksums=show_checksums, + run_cache=run_cache, ) except NoRemoteError: if not used.external and used["local"]: diff --git a/dvc/repo/pull.py b/dvc/repo/pull.py index 626ad7e25b..16d1600455 100644 --- a/dvc/repo/pull.py +++ b/dvc/repo/pull.py @@ -17,6 +17,7 @@ def pull( force=False, recursive=False, all_commits=False, + run_cache=False, ): processed_files_count = self._fetch( targets, @@ -27,6 +28,7 @@ def pull( all_commits=all_commits, with_deps=with_deps, recursive=recursive, + run_cache=run_cache, ) stats = self._checkout( targets=targets, with_deps=with_deps, force=force, recursive=recursive diff --git a/dvc/repo/push.py b/dvc/repo/push.py index 43946d37c0..b98714338b 100644 --- a/dvc/repo/push.py +++ b/dvc/repo/push.py @@ -12,6 +12,7 @@ def push( all_tags=False, recursive=False, all_commits=False, + run_cache=False, ): used = self.used_cache( targets, @@ -24,4 +25,5 @@ def push( jobs=jobs, recursive=recursive, ) - return self.cloud.push(used, jobs, remote=remote) + + return self.cloud.push(used, jobs, remote=remote, run_cache=run_cache) diff --git a/dvc/stage/cache.py b/dvc/stage/cache.py index 2991157991..af6528f51f 100644 --- a/dvc/stage/cache.py +++ b/dvc/stage/cache.py @@ -2,6 +2,7 @@ import os import yaml +from funcy import first from voluptuous import Invalid from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA @@ -36,8 +37,9 @@ def _get_stage_hash(stage): class StageCache: - def __init__(self, cache_dir): - self.cache_dir = os.path.join(cache_dir, "runs") + def __init__(self, repo): + self.repo = repo + self.cache_dir = os.path.join(repo.cache.local.cache_dir, "runs") def _get_cache_dir(self, key): return os.path.join(self.cache_dir, key[:2], key) @@ -101,3 +103,24 @@ def restore(self, stage): if not cache: return StageLoader.fill_from_lock(stage, cache) + + @staticmethod + def _transfer(func, from_remote, to_remote): + runs = from_remote.path_info / "runs" + if not from_remote.exists(runs): + return + + for src in from_remote.walk_files(runs): + rel = src.relative_to(from_remote.path_info) + dst = to_remote.path_info / rel + key = dst.parent + # check if any build cache already exists for this key + if to_remote.exists(key) and first(to_remote.walk_files(key)): + continue + func(src, dst) + + def push(self, remote): + return self._transfer(remote.upload, self.repo.cache.local, remote) + + def pull(self, remote): + return self._transfer(remote.download, remote, self.repo.cache.local) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 1ac91a1df2..e17633b1b7 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -826,6 +826,8 @@ def test_pipeline_file_target_ops(tmp_dir, dvc, local_remote, run_copy): outs = ["foo", "bar", "lorem", "ipsum", "baz", "lorem2"] + remove(dvc.stage_cache.cache_dir) + dvc.push() # each one's a copy of other, hence 3 assert len(recurse_list_dir(fspath_py35(local_remote))) == 3 diff --git a/tests/unit/command/test_data_sync.py b/tests/unit/command/test_data_sync.py index dab1379eef..bb4433a11e 100644 --- a/tests/unit/command/test_data_sync.py +++ b/tests/unit/command/test_data_sync.py @@ -17,6 +17,7 @@ def test_fetch(mocker): "--all-commits", "--with-deps", "--recursive", + "--run-cache", ] ) assert cli_args.func == CmdDataFetch @@ -35,6 +36,7 @@ def test_fetch(mocker): all_commits=True, with_deps=True, recursive=True, + run_cache=True, ) @@ -54,6 +56,7 @@ def test_pull(mocker): "--with-deps", "--force", "--recursive", + "--run-cache", ] ) assert cli_args.func == CmdDataPull @@ -73,6 +76,7 @@ def test_pull(mocker): with_deps=True, force=True, recursive=True, + run_cache=True, ) @@ -91,6 +95,7 @@ def test_push(mocker): "--all-commits", "--with-deps", "--recursive", + "--run-cache", ] ) assert cli_args.func == CmdDataPush @@ -109,4 +114,5 @@ def test_push(mocker): all_commits=True, with_deps=True, recursive=True, + run_cache=True, )