diff --git a/dvc/schema.py b/dvc/schema.py index d6a24d1473..4e06197873 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -42,4 +42,5 @@ COMPILED_SINGLE_STAGE_SCHEMA = Schema(SINGLE_STAGE_SCHEMA) COMPILED_MULTI_STAGE_SCHEMA = Schema(MULTI_STAGE_SCHEMA) +COMPILED_LOCK_FILE_STAGE_SCHEMA = Schema(LOCK_FILE_STAGE_SCHEMA) COMPILED_LOCKFILE_SCHEMA = Schema(LOCKFILE_SCHEMA) diff --git a/dvc/serialize.py b/dvc/serialize.py index ec004e5860..09172f7765 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -90,9 +90,8 @@ def to_pipeline_file(stage: "PipelineStage"): } -def to_lockfile(stage: "PipelineStage"): +def to_single_stage_lockfile(stage: "Stage") -> dict: assert stage.cmd - assert stage.name res = OrderedDict([("cmd", stage.cmd)]) params, deps = get_params_deps(stage) @@ -112,7 +111,12 @@ def to_lockfile(stage: "PipelineStage"): if outs: res["outs"] = outs - return {stage.name: res} + return res + + +def to_lockfile(stage: "PipelineStage") -> dict: + assert stage.name + return {stage.name: to_single_stage_lockfile(stage)} def to_single_stage_file(stage: "Stage"): diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index e1c629abd8..e67daf35b8 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -648,9 +648,6 @@ def run( else: if not dry: - if not force and not ignore_build_cache: - self.repo.stage_cache.restore(self) - if ( not force and not self.is_callback @@ -661,8 +658,28 @@ def run( self.checkout() else: self._save_deps() - logger.info("Running command:\n\t{}".format(self.cmd)) - self._run() + if ( + not force + and not ignore_build_cache + and self.repo.stage_cache.is_cached(self) + ): + logger.info("Stage is cached, skipping.") + self.repo.stage_cache.restore(self) + if all( + not out.changed_cache() + if out.use_cache + else not out.changed() + for out in self.outs + ): + self.checkout() + else: + logger.info( + "Running command:\n\t{}".format(self.cmd) + ) + self._run() + else: + logger.info("Running command:\n\t{}".format(self.cmd)) + self._run() if not dry: self.save() diff --git a/dvc/stage/cache.py b/dvc/stage/cache.py index e4ceacb2c0..17c6038797 100644 --- a/dvc/stage/cache.py +++ b/dvc/stage/cache.py @@ -2,29 +2,24 @@ import yaml import logging -from voluptuous import Schema, Required, Invalid +from voluptuous import Invalid +from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA +from dvc.serialize import to_single_stage_lockfile from dvc.utils.fs import makedirs from dvc.utils import relpath, dict_sha256 +from dvc.utils.stage import dump_stage_file logger = logging.getLogger(__name__) -SCHEMA = Schema( - { - Required("cmd"): str, - Required("deps"): {str: str}, - Required("outs"): {str: str}, - } -) - def _get_cache_hash(cache, key=False): + if key: + outs = [out["path"] for out in cache["outs"]] + else: + outs = cache["outs"] return dict_sha256( - { - "cmd": cache["cmd"], - "deps": cache["deps"], - "outs": list(cache["outs"].keys()) if key else cache["outs"], - } + {"cmd": cache["cmd"], "deps": cache["deps"], "outs": outs} ) @@ -40,15 +35,7 @@ def _get_stage_hash(stage): if out.scheme != "local" or not out.def_path or out.persist: return None - return _get_cache_hash(_create_cache(stage), key=True) - - -def _create_cache(stage): - return { - "cmd": stage.cmd, - "deps": {dep.def_path: dep.get_checksum() for dep in stage.deps}, - "outs": {out.def_path: out.get_checksum() for out in stage.outs}, - } + return _get_cache_hash(to_single_stage_lockfile(stage), key=True) class StageCache: @@ -66,7 +53,7 @@ def _load_cache(self, key, value): try: with open(path, "r") as fobj: - return SCHEMA(yaml.safe_load(fobj)) + return COMPILED_LOCK_FILE_STAGE_SCHEMA(yaml.safe_load(fobj)) except FileNotFoundError: return None except (yaml.error.YAMLError, Invalid): @@ -95,20 +82,22 @@ def save(self, stage): if not cache_key: return - cache = _create_cache(stage) + cache = to_single_stage_lockfile(stage) cache_value = _get_cache_hash(cache) if self._load_cache(cache_key, cache_value): return # sanity check - SCHEMA(cache) + COMPILED_LOCK_FILE_STAGE_SCHEMA(cache) path = self._get_cache_path(cache_key, cache_value) dpath = os.path.dirname(path) makedirs(dpath, exist_ok=True) - with open(path, "w+") as fobj: - yaml.dump(cache, fobj) + dump_stage_file(path, cache) + + def is_cached(self, stage): + return bool(self._load(stage)) def restore(self, stage): cache = self._load(stage) @@ -116,9 +105,11 @@ def restore(self, stage): return deps = {dep.def_path: dep for dep in stage.deps} - for def_path, checksum in cache["deps"].items(): - deps[def_path].checksum = checksum + for entry in cache["deps"]: + dep = deps[entry["path"]] + dep.checksum = entry[dep.checksum_type] outs = {out.def_path: out for out in stage.outs} - for def_path, checksum in cache["outs"].items(): - outs[def_path].checksum = checksum + for entry in cache["outs"]: + out = outs[entry["path"]] + out.checksum = entry[out.checksum_type] diff --git a/tests/unit/test_stage.py b/tests/unit/test_stage.py index 982d549a35..c7d41a6ef0 100644 --- a/tests/unit/test_stage.py +++ b/tests/unit/test_stage.py @@ -116,12 +116,12 @@ def test_stage_cache(tmp_dir, dvc, run_copy, mocker): cache_dir = os.path.join( dvc.stage_cache.cache_dir, - "ec", - "ec5b6d8dea9136dbb62d93a95c777f87e6c54b0a6bee839554acb99fdf23d2b1", + "75", + "75f8a9097d76293ff4b3684d52e4ad0e83686d31196f27eb0b2ea9fd5085565e", ) cache_file = os.path.join( cache_dir, - "09f9eb17fdb1ee7f8566b3c57394cee060eaf28075244bc6058612ac91fdf04a", + "c1747e52065bc7801262fdaed4d63f5775e5da304008bd35e2fea4e6b1ccb272", ) assert os.path.isdir(cache_dir)