Skip to content

Commit

Permalink
run: consolidate run-cache logic
Browse files Browse the repository at this point in the history
Existing lock files are just an edge case of run-cache.

Prerequisite for #4223
  • Loading branch information
efiop committed Sep 6, 2020
1 parent 636a019 commit d42e3a3
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 44 deletions.
34 changes: 28 additions & 6 deletions dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from voluptuous import Invalid

from dvc.cache.local import _log_exceptions
from dvc.exceptions import DvcException
from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA
from dvc.utils import dict_sha256, relpath
from dvc.utils.fs import makedirs
Expand All @@ -17,6 +18,11 @@
logger = logging.getLogger(__name__)


class RunCacheNotFoundError(DvcException):
def __init__(self, stage):
super().__init__(f"No run-cache for {stage.addressing}")


def _get_cache_hash(cache, key=False):
if key:
cache["outs"] = [out["path"] for out in cache.get("outs", [])]
Expand Down Expand Up @@ -142,18 +148,34 @@ def save(self, stage):
makedirs(dpath, exist_ok=True)
dump_yaml(path, cache)

def is_cached(self, stage):
return bool(self._load(stage))

def restore(self, stage):
def _restore(self, stage):
stage.save_deps()
cache = self._load(stage)
if not cache:
return
StageLoader.fill_from_lock(stage, cache)
raise RunCacheNotFoundError(stage)

StageLoader.fill_from_lock(stage, cache)
for out in self._uncached_outs(stage, cache):
out.checkout()

if not stage.outs_cached():
raise RunCacheNotFoundError(stage)

def restore(self, stage, run_cache=True):
if stage.is_callback or stage.always_changed:
raise RunCacheNotFoundError(stage)

if not stage.already_cached():
if not run_cache: # backward compatibility
raise RunCacheNotFoundError(stage)
self._restore(stage)

logger.info(
"Stage '%s' is cached - skipping run, checking out outputs",
stage.addressing,
)
stage.checkout()

@staticmethod
def _transfer(func, from_remote, to_remote):
ret = []
Expand Down
36 changes: 6 additions & 30 deletions dvc/stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,39 +81,15 @@ def cmd_run(stage, *args, **kwargs):
raise StageCmdFailedError(stage.cmd, retcode)


def _is_cached(stage):
cached = (
not stage.is_callback
and not stage.always_changed
and stage.already_cached()
)
if cached:
logger.info("Stage '%s' is cached", stage.addressing)
return cached


def restored_from_cache(stage):
stage.save_deps()
stage_cache = stage.repo.stage_cache
if not stage_cache.is_cached(stage):
return False
# restore stage from build cache
stage_cache.restore(stage)
restored = stage.outs_cached()
if restored:
logger.info("Restored stage '%s' from run-cache", stage.addressing)
return restored


def run_stage(stage, dry=False, force=False, run_cache=False):
if not (dry or force):
stage_cached = _is_cached(stage) or (
run_cache and restored_from_cache(stage)
)
if stage_cached:
logger.info("Skipping run, checking out outputs")
stage.checkout()
from .cache import RunCacheNotFoundError

try:
stage.repo.stage_cache.restore(stage, run_cache=run_cache)
return
except RunCacheNotFoundError:
pass

callback_str = "callback " if stage.is_callback else ""
logger.info(
Expand Down
16 changes: 8 additions & 8 deletions tests/func/test_run_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_restore(tmp_dir, dvc, run_copy, mocker):

(stage,) = dvc.reproduce("copy-foo-bar")

mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)
mock_run.assert_not_called()
assert (tmp_dir / "bar").exists() and not (tmp_dir / "foo").unlink()
assert (tmp_dir / PIPELINE_LOCK).exists()
Expand All @@ -46,7 +46,7 @@ def test_save(tmp_dir, dvc, run_copy):
stage = run_copy("foo", "bar", name="copy-foo-bar")
assert _recurse_count_files(run_cache_dir) == 1
with dvc.state:
assert dvc.stage_cache.is_cached(stage)
assert dvc.stage_cache._load(stage)


def test_do_not_save_on_no_exec_and_dry(tmp_dir, dvc, run_copy):
Expand All @@ -58,13 +58,13 @@ def test_do_not_save_on_no_exec_and_dry(tmp_dir, dvc, run_copy):

assert _recurse_count_files(run_cache_dir) == 0
with dvc.state:
assert not dvc.stage_cache.is_cached(stage)
assert not dvc.stage_cache._load(stage)

(stage,) = dvc.reproduce("copy-foo-bar", dry=True)

assert _recurse_count_files(run_cache_dir) == 0
with dvc.state:
assert not dvc.stage_cache.is_cached(stage)
assert not dvc.stage_cache._load(stage)


def test_uncached_outs_are_cached(tmp_dir, dvc, run_copy):
Expand Down Expand Up @@ -103,7 +103,7 @@ def test_memory_for_multiple_runs_of_same_stage(
assert (tmp_dir / PIPELINE_LOCK).exists()
assert (tmp_dir / "bar").read_text() == "foobar"
mock_run.assert_not_called()
mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)
mock_restore.reset_mock()

(tmp_dir / PIPELINE_LOCK).unlink()
Expand All @@ -112,7 +112,7 @@ def test_memory_for_multiple_runs_of_same_stage(

assert (tmp_dir / "bar").read_text() == "foo"
mock_run.assert_not_called()
mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)
assert (tmp_dir / "bar").exists() and not (tmp_dir / "foo").unlink()
assert (tmp_dir / PIPELINE_LOCK).exists()

Expand Down Expand Up @@ -141,12 +141,12 @@ def test_memory_runs_of_multiple_stages(tmp_dir, dvc, run_copy, mocker):
assert (tmp_dir / "foo.bak").read_text() == "foo"
assert (tmp_dir / PIPELINE_LOCK).exists()
mock_run.assert_not_called()
mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)
mock_restore.reset_mock()

(stage,) = dvc.reproduce("backup-bar")

assert (tmp_dir / "bar.bak").read_text() == "bar"
assert (tmp_dir / PIPELINE_LOCK).exists()
mock_run.assert_not_called()
mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)

0 comments on commit d42e3a3

Please sign in to comment.