Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run: consolidate run-cache logic #4535

Merged
merged 1 commit into from
Sep 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from voluptuous import Invalid

from dvc.cache.local import _log_exceptions
from dvc.exceptions import DvcException
from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA
from dvc.utils import dict_sha256, relpath
from dvc.utils.fs import makedirs
Expand All @@ -17,6 +18,11 @@
logger = logging.getLogger(__name__)


class RunCacheNotFoundError(DvcException):
def __init__(self, stage):
super().__init__(f"No run-cache for {stage.addressing}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used for internal workflow, not user-facing.



def _get_cache_hash(cache, key=False):
if key:
cache["outs"] = [out["path"] for out in cache.get("outs", [])]
Expand Down Expand Up @@ -142,18 +148,34 @@ def save(self, stage):
makedirs(dpath, exist_ok=True)
dump_yaml(path, cache)

def is_cached(self, stage):
return bool(self._load(stage))

def restore(self, stage):
def _restore(self, stage):
stage.save_deps()
cache = self._load(stage)
if not cache:
return
StageLoader.fill_from_lock(stage, cache)
raise RunCacheNotFoundError(stage)

StageLoader.fill_from_lock(stage, cache)
for out in self._uncached_outs(stage, cache):
out.checkout()

if not stage.outs_cached():
raise RunCacheNotFoundError(stage)

def restore(self, stage, run_cache=True):
if stage.is_callback or stage.always_changed:
raise RunCacheNotFoundError(stage)

if not stage.already_cached():
if not run_cache: # backward compatibility
raise RunCacheNotFoundError(stage)
self._restore(stage)

logger.info(
"Stage '%s' is cached - skipping run, checking out outputs",
stage.addressing,
)
stage.checkout()

@staticmethod
def _transfer(func, from_remote, to_remote):
ret = []
Expand Down
36 changes: 6 additions & 30 deletions dvc/stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,39 +81,15 @@ def cmd_run(stage, *args, **kwargs):
raise StageCmdFailedError(stage.cmd, retcode)


def _is_cached(stage):
cached = (
not stage.is_callback
and not stage.always_changed
and stage.already_cached()
)
if cached:
logger.info("Stage '%s' is cached", stage.addressing)
return cached


def restored_from_cache(stage):
stage.save_deps()
stage_cache = stage.repo.stage_cache
if not stage_cache.is_cached(stage):
return False
# restore stage from build cache
stage_cache.restore(stage)
restored = stage.outs_cached()
if restored:
logger.info("Restored stage '%s' from run-cache", stage.addressing)
return restored


def run_stage(stage, dry=False, force=False, run_cache=False):
if not (dry or force):
stage_cached = _is_cached(stage) or (
run_cache and restored_from_cache(stage)
)
if stage_cached:
logger.info("Skipping run, checking out outputs")
stage.checkout()
from .cache import RunCacheNotFoundError

try:
stage.repo.stage_cache.restore(stage, run_cache=run_cache)
return
except RunCacheNotFoundError:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using exception flow control to avoid unnecessary calls.

pass

callback_str = "callback " if stage.is_callback else ""
logger.info(
Expand Down
16 changes: 8 additions & 8 deletions tests/func/test_run_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_restore(tmp_dir, dvc, run_copy, mocker):

(stage,) = dvc.reproduce("copy-foo-bar")

mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)
mock_run.assert_not_called()
assert (tmp_dir / "bar").exists() and not (tmp_dir / "foo").unlink()
assert (tmp_dir / PIPELINE_LOCK).exists()
Expand All @@ -46,7 +46,7 @@ def test_save(tmp_dir, dvc, run_copy):
stage = run_copy("foo", "bar", name="copy-foo-bar")
assert _recurse_count_files(run_cache_dir) == 1
with dvc.state:
assert dvc.stage_cache.is_cached(stage)
assert dvc.stage_cache._load(stage)


def test_do_not_save_on_no_exec_and_dry(tmp_dir, dvc, run_copy):
Expand All @@ -58,13 +58,13 @@ def test_do_not_save_on_no_exec_and_dry(tmp_dir, dvc, run_copy):

assert _recurse_count_files(run_cache_dir) == 0
with dvc.state:
assert not dvc.stage_cache.is_cached(stage)
assert not dvc.stage_cache._load(stage)

(stage,) = dvc.reproduce("copy-foo-bar", dry=True)

assert _recurse_count_files(run_cache_dir) == 0
with dvc.state:
assert not dvc.stage_cache.is_cached(stage)
assert not dvc.stage_cache._load(stage)


def test_uncached_outs_are_cached(tmp_dir, dvc, run_copy):
Expand Down Expand Up @@ -103,7 +103,7 @@ def test_memory_for_multiple_runs_of_same_stage(
assert (tmp_dir / PIPELINE_LOCK).exists()
assert (tmp_dir / "bar").read_text() == "foobar"
mock_run.assert_not_called()
mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)
mock_restore.reset_mock()

(tmp_dir / PIPELINE_LOCK).unlink()
Expand All @@ -112,7 +112,7 @@ def test_memory_for_multiple_runs_of_same_stage(

assert (tmp_dir / "bar").read_text() == "foo"
mock_run.assert_not_called()
mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)
assert (tmp_dir / "bar").exists() and not (tmp_dir / "foo").unlink()
assert (tmp_dir / PIPELINE_LOCK).exists()

Expand Down Expand Up @@ -141,12 +141,12 @@ def test_memory_runs_of_multiple_stages(tmp_dir, dvc, run_copy, mocker):
assert (tmp_dir / "foo.bak").read_text() == "foo"
assert (tmp_dir / PIPELINE_LOCK).exists()
mock_run.assert_not_called()
mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)
mock_restore.reset_mock()

(stage,) = dvc.reproduce("backup-bar")

assert (tmp_dir / "bar.bak").read_text() == "bar"
assert (tmp_dir / PIPELINE_LOCK).exists()
mock_run.assert_not_called()
mock_restore.assert_called_once_with(stage)
mock_restore.assert_called_once_with(stage, run_cache=True)