Skip to content

Commit

Permalink
dump: lockfile is dumped deterministically
Browse files Browse the repository at this point in the history
The dump is no longer deterministic/dependent on the pipeline file,
but is sorted based on file names in outs, deps or params.
Also, the params inside each files are also sorted based on name.
However, the objects inside params are not sorted deterministically
as I think it's too much to sort that, and is not easy (considering
the types of objects it might hold, eg: lists, objects, etc).

This will also provide ordered dumps for Python3.5
  • Loading branch information
skshetry committed Apr 30, 2020
1 parent f9088e1 commit 8f79d64
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 30 deletions.
73 changes: 43 additions & 30 deletions dvc/serialize.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from collections import OrderedDict
from functools import partial
from operator import attrgetter
from typing import TYPE_CHECKING

from funcy import rpartial, lsplit
Expand All @@ -15,9 +18,12 @@
DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE


sort_by_path = partial(sorted, key=attrgetter("def_path"))


def _get_outs(stage: "PipelineStage"):
outs_bucket = {}
for o in stage.outs:
for o in sort_by_path(stage.outs):
bucket_key = ["metrics"] if o.metric else ["outs"]

if not o.metric and o.persist:
Expand All @@ -26,7 +32,7 @@ def _get_outs(stage: "PipelineStage"):
bucket_key += ["no_cache"]
key = "_".join(bucket_key)
outs_bucket[key] = outs_bucket.get(key, []) + [o.def_path]
return outs_bucket
return [(key, outs_bucket[key]) for key in sorted(outs_bucket.keys())]


def get_params_deps(stage: "PipelineStage"):
Expand All @@ -40,60 +46,67 @@ def _serialize_params(params: List[ParamsDependency]):
which is in the shape of:
['lr', 'train', {'params2.yaml': ['lr']}]
`key_vals` - which is list of params with values, used in a lockfile
which is in the shape of:
{'params.yaml': {'lr': '1', 'train': 2}, {'params2.yaml': {'lr': '1'}}
"""
keys = []
key_vals = {}
key_vals = OrderedDict()

for param_dep in params:
for param_dep in sort_by_path(params):
dump = param_dep.dumpd()
path, params = dump[PARAM_PATH], dump[PARAM_PARAMS]
k = list(params.keys())
if not k:
continue
# if it's not a default file, change the shape
# to: {path: k}
keys.extend(k if path == DEFAULT_PARAMS_FILE else [{path: k}])
key_vals.update({path: params})

key_vals[path] = OrderedDict([(key, params[key]) for key in sorted(k)])
# params from default file is always kept at the start of the `params:`
if path == DEFAULT_PARAMS_FILE:
keys = k + keys
key_vals.move_to_end(path, last=False)
else:
# if it's not a default file, change the shape
# to: {path: k}
keys.append({path: k})
return keys, key_vals


def to_pipeline_file(stage: "PipelineStage"):
params, deps = get_params_deps(stage)
serialized_params, _ = _serialize_params(params)

res = [
(stage.PARAM_CMD, stage.cmd),
(stage.PARAM_WDIR, stage.resolve_wdir()),
(stage.PARAM_DEPS, [d.def_path for d in deps]),
(stage.PARAM_PARAMS, serialized_params),
*_get_outs(stage),
(stage.PARAM_LOCKED, stage.locked),
(stage.PARAM_ALWAYS_CHANGED, stage.always_changed),
]
return {
stage.name: {
key: value
for key, value in {
stage.PARAM_CMD: stage.cmd,
stage.PARAM_WDIR: stage.resolve_wdir(),
stage.PARAM_DEPS: [d.def_path for d in deps],
stage.PARAM_PARAMS: serialized_params,
**_get_outs(stage),
stage.PARAM_LOCKED: stage.locked,
stage.PARAM_ALWAYS_CHANGED: stage.always_changed,
}.items()
if value
}
stage.name: OrderedDict([(key, value) for key, value in res if value])
}


def to_lockfile(stage: "PipelineStage") -> dict:
def to_lockfile(stage: "PipelineStage"):
assert stage.cmd
assert stage.name

res = {"cmd": stage.cmd}
res = OrderedDict([("cmd", stage.cmd)])
params, deps = get_params_deps(stage)
deps = [
{"path": dep.def_path, dep.checksum_type: dep.checksum} for dep in deps
]
outs = [
{"path": out.def_path, out.checksum_type: out.checksum}
for out in stage.outs
deps, outs = [
[
OrderedDict(
[
("path", item.def_path),
(item.checksum_type, item.checksum),
]
)
for item in sort_by_path(items)
]
for items in [deps, stage.outs]
]
if deps:
res["deps"] = deps
Expand Down
7 changes: 7 additions & 0 deletions dvc/utils/stage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections import OrderedDict

import yaml
from ruamel.yaml import YAML
from ruamel.yaml.error import YAMLError
Expand Down Expand Up @@ -42,4 +44,9 @@ def dump_stage_file(path, data):
with open(path, "w", encoding="utf-8") as fd:
yaml = YAML()
yaml.default_flow_style = False
# tell Dumper to represent OrderedDict as
# normal dict
yaml.Representer.add_representer(
OrderedDict, yaml.Representer.represent_dict
)
yaml.dump(data, fd)
183 changes: 183 additions & 0 deletions tests/func/test_lockfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import os
from collections import OrderedDict
from operator import itemgetter
from textwrap import dedent

import pytest
import yaml
from dvc.dvcfile import PIPELINE_LOCK
from dvc.serialize import get_params_deps
from dvc.utils.fs import remove
from dvc.utils.stage import parse_stage_for_update

from tests.func.test_run_multistage import supported_params


FS_STRUCTURE = {
"foo": "bar\nfoobar",
"bar": "foo\nfoobar",
"foobar": "foobar\nbar",
"params.yaml": yaml.dump(supported_params),
"params2.yaml": yaml.dump(supported_params),
}


@pytest.fixture
def run_head(tmp_dir, dvc):
"""Output first line of each file to different file with '-1' appended."""
tmp_dir.gen(
"head.py",
dedent(
"""
import sys
_, *files = sys.argv
for file in files:
with open(file) as f, open(file +"-1","w+") as w:
w.write(f.readline())
"""
),
)

def run(*args, **run_kwargs):
return dvc.run(
cmd="python head.py {}".format(" ".join(args)),
outs=[dep + "-1" for dep in args],
deps=args,
**run_kwargs
)

return run


def read_lock_file(file=PIPELINE_LOCK):
with open(file) as f:
data = parse_stage_for_update(f.read(), file)
assert isinstance(data, OrderedDict)
return data


def assert_eq_lockfile(previous, new):
for content in (previous, new):
assert isinstance(content, OrderedDict)

# if they both are OrderedDict, then `==` will also check for order
assert previous == new


def test_deps_outs_are_sorted_by_path(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
run_head(*deps, name="copy-first-line")

initial_content = read_lock_file()
lock = initial_content["copy-first-line"]

# lock stage key order:
assert list(lock.keys()) == ["cmd", "deps", "outs"]

# `path` key appear first and then the `md5`
assert all(list(dep.keys()) == ["path", "md5"] for dep in lock["deps"])
assert all(list(out.keys()) == ["path", "md5"] for out in lock["outs"])

# deps are always sorted by the file path naming
assert list(map(itemgetter("path"), lock["deps"])) == sorted(deps)

# outs are too
assert list(
map(itemgetter("path"), initial_content["copy-first-line"]["outs"])
) == [d + "-1" for d in sorted(deps)]


def test_order_is_preserved_when_pipeline_order_changes(
tmp_dir, dvc, run_head
):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
stage = run_head(*deps, name="copy-first-line")

initial_content = read_lock_file()
# reverse order of stage.outs and dump to the pipeline file
# then, again change stage.deps and dump to the pipeline file
reversal = stage.outs.reverse, stage.deps.reverse
for reverse_items in reversal:
reverse_items()
stage.dvcfile._dump_pipeline_file(stage)

# we only changed the order, should not reproduce
assert not dvc.reproduce(stage.addressing)

new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)

(tmp_dir / PIPELINE_LOCK).unlink()
assert dvc.reproduce(stage.addressing) == [stage]
new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)


def test_cmd_changes_other_orders_are_preserved(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
stage = run_head(*deps, name="copy-first-line")

initial_content = read_lock_file()
# let's change cmd in pipeline file
# it should only change "cmd", otherwise it should be
# structurally same as cmd
stage.cmd = " ".join(stage.cmd.split())
stage.dvcfile._dump_pipeline_file(stage)

initial_content["copy-first-line"]["cmd"] = stage.cmd

assert dvc.reproduce(stage.addressing) == [stage]

new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)


def test_params_dump(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)

stage = run_head(
"foo",
"bar",
"foobar",
name="copy-first-line",
params=[
"params2.yaml:answer,lists,name",
"params.yaml:lists,floats,nested.nested1,nested.nested1.nested2",
],
)

initial_content = read_lock_file()
lock = initial_content["copy-first-line"]

# lock stage key order:
assert list(lock.keys()) == ["cmd", "deps", "params", "outs"]
assert list(lock["params"].keys()) == ["params.yaml", "params2.yaml"]

# # params keys are always sorted by the name
assert list(lock["params"]["params.yaml"].keys()) == [
"floats",
"lists",
"nested.nested1",
"nested.nested1.nested2",
]
assert list(lock["params"]["params2.yaml"]) == ["answer", "lists", "name"]

assert not dvc.reproduce(stage.addressing)

# let's change the order of params and dump them in pipeline file
params, _ = get_params_deps(stage)
for param in params:
param.params.reverse()

stage.dvcfile._dump_pipeline_file(stage)
assert not dvc.reproduce(stage.addressing)

(tmp_dir / PIPELINE_LOCK).unlink()
# XXX: temporary workaround due to lack of params support in build cache
remove(os.path.join(dvc.cache.local.cache_dir, "stages"))

assert dvc.reproduce(stage.addressing) == [stage]
assert_eq_lockfile(initial_content, read_lock_file())

0 comments on commit 8f79d64

Please sign in to comment.