Skip to content

Commit

Permalink
stage: cache: use lockfiles
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed May 1, 2020
1 parent 22c60dd commit 0b2569a
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 43 deletions.
1 change: 1 addition & 0 deletions dvc/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@

COMPILED_SINGLE_STAGE_SCHEMA = Schema(SINGLE_STAGE_SCHEMA)
COMPILED_MULTI_STAGE_SCHEMA = Schema(MULTI_STAGE_SCHEMA)
COMPILED_LOCK_FILE_STAGE_SCHEMA = Schema(LOCK_FILE_STAGE_SCHEMA)
COMPILED_LOCKFILE_SCHEMA = Schema(LOCKFILE_SCHEMA)
10 changes: 7 additions & 3 deletions dvc/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ def to_pipeline_file(stage: "PipelineStage"):
}


def to_lockfile(stage: "PipelineStage"):
def to_single_stage_lockfile(stage: "Stage") -> dict:
assert stage.cmd
assert stage.name

res = OrderedDict([("cmd", stage.cmd)])
params, deps = get_params_deps(stage)
Expand All @@ -112,7 +111,12 @@ def to_lockfile(stage: "PipelineStage"):
if outs:
res["outs"] = outs

return {stage.name: res}
return res


def to_lockfile(stage: "PipelineStage") -> dict:
assert stage.name
return {stage.name: to_single_stage_lockfile(stage)}


def to_single_stage_file(stage: "Stage"):
Expand Down
27 changes: 22 additions & 5 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,6 @@ def run(

else:
if not dry:
if not force and not ignore_build_cache:
self.repo.stage_cache.restore(self)

if (
not force
and not self.is_callback
Expand All @@ -661,8 +658,28 @@ def run(
self.checkout()
else:
self._save_deps()
logger.info("Running command:\n\t{}".format(self.cmd))
self._run()
if (
not force
and not ignore_build_cache
and self.repo.stage_cache.is_cached(self)
):
logger.info("Stage is cached, skipping.")
self.repo.stage_cache.restore(self)
if all(
not out.changed_cache()
if out.use_cache
else not out.changed()
for out in self.outs
):
self.checkout()
else:
logger.info(
"Running command:\n\t{}".format(self.cmd)
)
self._run()
else:
logger.info("Running command:\n\t{}".format(self.cmd))
self._run()

if not dry:
self.save()
Expand Down
55 changes: 23 additions & 32 deletions dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,24 @@
import yaml
import logging

from voluptuous import Schema, Required, Invalid
from voluptuous import Invalid

from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA
from dvc.serialize import to_single_stage_lockfile
from dvc.utils.fs import makedirs
from dvc.utils import relpath, dict_sha256
from dvc.utils.stage import dump_stage_file

logger = logging.getLogger(__name__)

SCHEMA = Schema(
{
Required("cmd"): str,
Required("deps"): {str: str},
Required("outs"): {str: str},
}
)


def _get_cache_hash(cache, key=False):
if key:
outs = [out["path"] for out in cache["outs"]]
else:
outs = cache["outs"]
return dict_sha256(
{
"cmd": cache["cmd"],
"deps": cache["deps"],
"outs": list(cache["outs"].keys()) if key else cache["outs"],
}
{"cmd": cache["cmd"], "deps": cache["deps"], "outs": outs}
)


Expand All @@ -40,15 +35,7 @@ def _get_stage_hash(stage):
if out.scheme != "local" or not out.def_path or out.persist:
return None

return _get_cache_hash(_create_cache(stage), key=True)


def _create_cache(stage):
return {
"cmd": stage.cmd,
"deps": {dep.def_path: dep.get_checksum() for dep in stage.deps},
"outs": {out.def_path: out.get_checksum() for out in stage.outs},
}
return _get_cache_hash(to_single_stage_lockfile(stage), key=True)


class StageCache:
Expand All @@ -66,7 +53,7 @@ def _load_cache(self, key, value):

try:
with open(path, "r") as fobj:
return SCHEMA(yaml.safe_load(fobj))
return COMPILED_LOCK_FILE_STAGE_SCHEMA(yaml.safe_load(fobj))
except FileNotFoundError:
return None
except (yaml.error.YAMLError, Invalid):
Expand Down Expand Up @@ -95,30 +82,34 @@ def save(self, stage):
if not cache_key:
return

cache = _create_cache(stage)
cache = to_single_stage_lockfile(stage)
cache_value = _get_cache_hash(cache)

if self._load_cache(cache_key, cache_value):
return

# sanity check
SCHEMA(cache)
COMPILED_LOCK_FILE_STAGE_SCHEMA(cache)

path = self._get_cache_path(cache_key, cache_value)
dpath = os.path.dirname(path)
makedirs(dpath, exist_ok=True)
with open(path, "w+") as fobj:
yaml.dump(cache, fobj)
dump_stage_file(path, cache)

def is_cached(self, stage):
return bool(self._load(stage))

def restore(self, stage):
cache = self._load(stage)
if not cache:
return

deps = {dep.def_path: dep for dep in stage.deps}
for def_path, checksum in cache["deps"].items():
deps[def_path].checksum = checksum
for entry in cache["deps"]:
dep = deps[entry["path"]]
dep.checksum = entry[dep.checksum_type]

outs = {out.def_path: out for out in stage.outs}
for def_path, checksum in cache["outs"].items():
outs[def_path].checksum = checksum
for entry in cache["outs"]:
out = outs[entry["path"]]
out.checksum = entry[out.checksum_type]
6 changes: 3 additions & 3 deletions tests/unit/test_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ def test_stage_cache(tmp_dir, dvc, run_copy, mocker):

cache_dir = os.path.join(
dvc.stage_cache.cache_dir,
"ec",
"ec5b6d8dea9136dbb62d93a95c777f87e6c54b0a6bee839554acb99fdf23d2b1",
"75",
"75f8a9097d76293ff4b3684d52e4ad0e83686d31196f27eb0b2ea9fd5085565e",
)
cache_file = os.path.join(
cache_dir,
"09f9eb17fdb1ee7f8566b3c57394cee060eaf28075244bc6058612ac91fdf04a",
"c1747e52065bc7801262fdaed4d63f5775e5da304008bd35e2fea4e6b1ccb272",
)

assert os.path.isdir(cache_dir)
Expand Down

0 comments on commit 0b2569a

Please sign in to comment.