diff --git a/dvc/stage/cache.py b/dvc/stage/cache.py index ee102723e2..1569c652f6 100644 --- a/dvc/stage/cache.py +++ b/dvc/stage/cache.py @@ -6,6 +6,7 @@ from voluptuous import Invalid from dvc.cache.local import _log_exceptions +from dvc.exceptions import DvcException from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA from dvc.utils import dict_sha256, relpath from dvc.utils.fs import makedirs @@ -17,6 +18,11 @@ logger = logging.getLogger(__name__) +class RunCacheNotFoundError(DvcException): + def __init__(self, stage): + super().__init__(f"No run-cache for {stage.addressing}") + + def _get_cache_hash(cache, key=False): if key: cache["outs"] = [out["path"] for out in cache.get("outs", [])] @@ -142,18 +148,34 @@ def save(self, stage): makedirs(dpath, exist_ok=True) dump_yaml(path, cache) - def is_cached(self, stage): - return bool(self._load(stage)) - - def restore(self, stage): + def _restore(self, stage): + stage.save_deps() cache = self._load(stage) if not cache: - return - StageLoader.fill_from_lock(stage, cache) + raise RunCacheNotFoundError(stage) + StageLoader.fill_from_lock(stage, cache) for out in self._uncached_outs(stage, cache): out.checkout() + if not stage.outs_cached(): + raise RunCacheNotFoundError(stage) + + def restore(self, stage, run_cache=True): + if stage.is_callback or stage.always_changed: + raise RunCacheNotFoundError(stage) + + if not stage.already_cached(): + if not run_cache: # backward compatibility + raise RunCacheNotFoundError(stage) + self._restore(stage) + + logger.info( + "Stage '%s' is cached - skipping run, checking out outputs", + stage.addressing, + ) + stage.checkout() + @staticmethod def _transfer(func, from_remote, to_remote): ret = [] diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 2cd30c5f67..71386509e1 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -81,39 +81,15 @@ def cmd_run(stage, *args, **kwargs): raise StageCmdFailedError(stage.cmd, retcode) -def _is_cached(stage): - cached = ( - not stage.is_callback - and not stage.always_changed - and stage.already_cached() - ) - if cached: - logger.info("Stage '%s' is cached", stage.addressing) - return cached - - -def restored_from_cache(stage): - stage.save_deps() - stage_cache = stage.repo.stage_cache - if not stage_cache.is_cached(stage): - return False - # restore stage from build cache - stage_cache.restore(stage) - restored = stage.outs_cached() - if restored: - logger.info("Restored stage '%s' from run-cache", stage.addressing) - return restored - - def run_stage(stage, dry=False, force=False, run_cache=False): if not (dry or force): - stage_cached = _is_cached(stage) or ( - run_cache and restored_from_cache(stage) - ) - if stage_cached: - logger.info("Skipping run, checking out outputs") - stage.checkout() + from .cache import RunCacheNotFoundError + + try: + stage.repo.stage_cache.restore(stage, run_cache=run_cache) return + except RunCacheNotFoundError: + pass callback_str = "callback " if stage.is_callback else "" logger.info( diff --git a/tests/func/test_run_cache.py b/tests/func/test_run_cache.py index f77a729cc8..55ecfaa402 100644 --- a/tests/func/test_run_cache.py +++ b/tests/func/test_run_cache.py @@ -32,7 +32,7 @@ def test_restore(tmp_dir, dvc, run_copy, mocker): (stage,) = dvc.reproduce("copy-foo-bar") - mock_restore.assert_called_once_with(stage) + mock_restore.assert_called_once_with(stage, run_cache=True) mock_run.assert_not_called() assert (tmp_dir / "bar").exists() and not (tmp_dir / "foo").unlink() assert (tmp_dir / PIPELINE_LOCK).exists() @@ -46,7 +46,7 @@ def test_save(tmp_dir, dvc, run_copy): stage = run_copy("foo", "bar", name="copy-foo-bar") assert _recurse_count_files(run_cache_dir) == 1 with dvc.state: - assert dvc.stage_cache.is_cached(stage) + assert dvc.stage_cache._load(stage) def test_do_not_save_on_no_exec_and_dry(tmp_dir, dvc, run_copy): @@ -58,13 +58,13 @@ def test_do_not_save_on_no_exec_and_dry(tmp_dir, dvc, run_copy): assert _recurse_count_files(run_cache_dir) == 0 with dvc.state: - assert not dvc.stage_cache.is_cached(stage) + assert not dvc.stage_cache._load(stage) (stage,) = dvc.reproduce("copy-foo-bar", dry=True) assert _recurse_count_files(run_cache_dir) == 0 with dvc.state: - assert not dvc.stage_cache.is_cached(stage) + assert not dvc.stage_cache._load(stage) def test_uncached_outs_are_cached(tmp_dir, dvc, run_copy): @@ -103,7 +103,7 @@ def test_memory_for_multiple_runs_of_same_stage( assert (tmp_dir / PIPELINE_LOCK).exists() assert (tmp_dir / "bar").read_text() == "foobar" mock_run.assert_not_called() - mock_restore.assert_called_once_with(stage) + mock_restore.assert_called_once_with(stage, run_cache=True) mock_restore.reset_mock() (tmp_dir / PIPELINE_LOCK).unlink() @@ -112,7 +112,7 @@ def test_memory_for_multiple_runs_of_same_stage( assert (tmp_dir / "bar").read_text() == "foo" mock_run.assert_not_called() - mock_restore.assert_called_once_with(stage) + mock_restore.assert_called_once_with(stage, run_cache=True) assert (tmp_dir / "bar").exists() and not (tmp_dir / "foo").unlink() assert (tmp_dir / PIPELINE_LOCK).exists() @@ -141,7 +141,7 @@ def test_memory_runs_of_multiple_stages(tmp_dir, dvc, run_copy, mocker): assert (tmp_dir / "foo.bak").read_text() == "foo" assert (tmp_dir / PIPELINE_LOCK).exists() mock_run.assert_not_called() - mock_restore.assert_called_once_with(stage) + mock_restore.assert_called_once_with(stage, run_cache=True) mock_restore.reset_mock() (stage,) = dvc.reproduce("backup-bar") @@ -149,4 +149,4 @@ def test_memory_runs_of_multiple_stages(tmp_dir, dvc, run_copy, mocker): assert (tmp_dir / "bar.bak").read_text() == "bar" assert (tmp_dir / PIPELINE_LOCK).exists() mock_run.assert_not_called() - mock_restore.assert_called_once_with(stage) + mock_restore.assert_called_once_with(stage, run_cache=True)