Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restyle dump: lockfile dump deterministically #3712

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 40 additions & 30 deletions dvc/serialize.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from collections import OrderedDict
from functools import partial
from operator import attrgetter
from typing import TYPE_CHECKING

from funcy import rpartial, lsplit
Expand All @@ -15,9 +18,12 @@
DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE


sort_by_path = partial(sorted, key=attrgetter("def_path"))


def _get_outs(stage: "PipelineStage"):
outs_bucket = {}
for o in stage.outs:
for o in sort_by_path(stage.outs):
bucket_key = ["metrics"] if o.metric else ["outs"]

if not o.metric and o.persist:
Expand All @@ -26,7 +32,7 @@ def _get_outs(stage: "PipelineStage"):
bucket_key += ["no_cache"]
key = "_".join(bucket_key)
outs_bucket[key] = outs_bucket.get(key, []) + [o.def_path]
return outs_bucket
return [(key, outs_bucket[key]) for key in sorted(outs_bucket.keys())]


def get_params_deps(stage: "PipelineStage"):
Expand All @@ -40,60 +46,64 @@ def _serialize_params(params: List[ParamsDependency]):

which is in the shape of:
['lr', 'train', {'params2.yaml': ['lr']}]

`key_vals` - which is list of params with values, used in a lockfile
which is in the shape of:
{'params.yaml': {'lr': '1', 'train': 2}, {'params2.yaml': {'lr': '1'}}
"""
keys = []
key_vals = {}
key_vals = OrderedDict()

for param_dep in params:
for param_dep in sort_by_path(params):
dump = param_dep.dumpd()
path, params = dump[PARAM_PATH], dump[PARAM_PARAMS]
k = list(params.keys())
if not k:
continue
# if it's not a default file, change the shape
# to: {path: k}
keys.extend(k if path == DEFAULT_PARAMS_FILE else [{path: k}])
key_vals.update({path: params})

key_vals[path] = OrderedDict([(key, params[key]) for key in sorted(k)])
# params from default file is always kept at the start of the `params:`
if path == DEFAULT_PARAMS_FILE:
keys = k + keys
key_vals.move_to_end(path, last=False)
else:
# if it's not a default file, change the shape
# to: {path: k}
keys.append({path: k})
return keys, key_vals


def to_pipeline_file(stage: "PipelineStage"):
params, deps = get_params_deps(stage)
serialized_params, _ = _serialize_params(params)

res = [
(stage.PARAM_CMD, stage.cmd),
(stage.PARAM_WDIR, stage.resolve_wdir()),
(stage.PARAM_DEPS, [d.def_path for d in deps]),
(stage.PARAM_PARAMS, serialized_params),
*_get_outs(stage),
(stage.PARAM_LOCKED, stage.locked),
(stage.PARAM_ALWAYS_CHANGED, stage.always_changed),
]
return {
stage.name: {
key: value
for key, value in {
stage.PARAM_CMD: stage.cmd,
stage.PARAM_WDIR: stage.resolve_wdir(),
stage.PARAM_DEPS: [d.def_path for d in deps],
stage.PARAM_PARAMS: serialized_params,
**_get_outs(stage),
stage.PARAM_LOCKED: stage.locked,
stage.PARAM_ALWAYS_CHANGED: stage.always_changed,
}.items()
if value
}
stage.name: OrderedDict([(key, value) for key, value in res if value])
}


def to_lockfile(stage: "PipelineStage") -> dict:
def to_lockfile(stage: "PipelineStage"):
assert stage.cmd
assert stage.name

res = {"cmd": stage.cmd}
res = OrderedDict([("cmd", stage.cmd)])
params, deps = get_params_deps(stage)
deps = [
{"path": dep.def_path, dep.checksum_type: dep.checksum} for dep in deps
]
outs = [
{"path": out.def_path, out.checksum_type: out.checksum}
for out in stage.outs
deps, outs = [
[
OrderedDict(
[("path", item.def_path), (item.checksum_type, item.checksum),]
)
for item in sort_by_path(items)
]
for items in [deps, stage.outs]
]
if deps:
res["deps"] = deps
Expand Down
7 changes: 7 additions & 0 deletions dvc/utils/stage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections import OrderedDict

import yaml
from ruamel.yaml import YAML
from ruamel.yaml.error import YAMLError
Expand Down Expand Up @@ -42,4 +44,9 @@ def dump_stage_file(path, data):
with open(path, "w", encoding="utf-8") as fd:
yaml = YAML()
yaml.default_flow_style = False
# tell Dumper to represent OrderedDict as
# normal dict
yaml.Representer.add_representer(
OrderedDict, yaml.Representer.represent_dict
)
yaml.dump(data, fd)
183 changes: 183 additions & 0 deletions tests/func/test_lockfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import os
from collections import OrderedDict
from operator import itemgetter
from textwrap import dedent

import pytest
import yaml
from dvc.dvcfile import PIPELINE_LOCK
from dvc.serialize import get_params_deps
from dvc.utils.fs import remove
from dvc.utils.stage import parse_stage_for_update

from tests.func.test_run_multistage import supported_params


FS_STRUCTURE = {
"foo": "bar\nfoobar",
"bar": "foo\nfoobar",
"foobar": "foobar\nbar",
"params.yaml": yaml.dump(supported_params),
"params2.yaml": yaml.dump(supported_params),
}


@pytest.fixture
def run_head(tmp_dir, dvc):
"""Output first line of each file to different file with '-1' appended."""
tmp_dir.gen(
"head.py",
dedent(
"""
import sys
_, *files = sys.argv
for file in files:
with open(file) as f, open(file +"-1","w+") as w:
w.write(f.readline())
"""
),
)

def run(*args, **run_kwargs):
return dvc.run(
cmd="python head.py {}".format(" ".join(args)),
outs=[dep + "-1" for dep in args],
deps=args,
**run_kwargs
)

return run


def read_lock_file(file=PIPELINE_LOCK):
with open(file) as f:
data = parse_stage_for_update(f.read(), file)
assert isinstance(data, OrderedDict)
return data


def assert_eq_lockfile(previous, new):
for content in (previous, new):
assert isinstance(content, OrderedDict)

# if they both are OrderedDict, then `==` will also check for order
assert previous == new


def test_deps_outs_are_sorted_by_path(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
run_head(*deps, name="copy-first-line")

initial_content = read_lock_file()
lock = initial_content["copy-first-line"]

# lock stage key order:
assert list(lock.keys()) == ["cmd", "deps", "outs"]

# `path` key appear first and then the `md5`
assert all(list(dep.keys()) == ["path", "md5"] for dep in lock["deps"])
assert all(list(out.keys()) == ["path", "md5"] for out in lock["outs"])

# deps are always sorted by the file path naming
assert list(map(itemgetter("path"), lock["deps"])) == sorted(deps)

# outs are too
assert list(
map(itemgetter("path"), initial_content["copy-first-line"]["outs"])
) == [d + "-1" for d in sorted(deps)]


def test_order_is_preserved_when_pipeline_order_changes(
tmp_dir, dvc, run_head
):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
stage = run_head(*deps, name="copy-first-line")

initial_content = read_lock_file()
# reverse order of stage.outs and dump to the pipeline file
# then, again change stage.deps and dump to the pipeline file
reversal = stage.outs.reverse, stage.deps.reverse
for reverse_items in reversal:
reverse_items()
stage.dvcfile._dump_pipeline_file(stage)

# we only changed the order, should not reproduce
assert not dvc.reproduce(stage.addressing)

new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)

(tmp_dir / PIPELINE_LOCK).unlink()
assert dvc.reproduce(stage.addressing) == [stage]
new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)


def test_cmd_changes_other_orders_are_preserved(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
stage = run_head(*deps, name="copy-first-line")

initial_content = read_lock_file()
# let's change cmd in pipeline file
# it should only change "cmd", otherwise it should be
# structurally same as cmd
stage.cmd = " ".join(stage.cmd.split())
stage.dvcfile._dump_pipeline_file(stage)

initial_content["copy-first-line"]["cmd"] = stage.cmd

assert dvc.reproduce(stage.addressing) == [stage]

new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)


def test_params_dump(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)

stage = run_head(
"foo",
"bar",
"foobar",
name="copy-first-line",
params=[
"params2.yaml:answer,lists,name",
"params.yaml:lists,floats,nested.nested1,nested.nested1.nested2",
],
)

initial_content = read_lock_file()
lock = initial_content["copy-first-line"]

# lock stage key order:
assert list(lock.keys()) == ["cmd", "deps", "params", "outs"]
assert list(lock["params"].keys()) == ["params.yaml", "params2.yaml"]

# # params keys are always sorted by the name
assert list(lock["params"]["params.yaml"].keys()) == [
"floats",
"lists",
"nested.nested1",
"nested.nested1.nested2",
]
assert list(lock["params"]["params2.yaml"]) == ["answer", "lists", "name"]

assert not dvc.reproduce(stage.addressing)

# let's change the order of params and dump them in pipeline file
params, _ = get_params_deps(stage)
for param in params:
param.params.reverse()

stage.dvcfile._dump_pipeline_file(stage)
assert not dvc.reproduce(stage.addressing)

(tmp_dir / PIPELINE_LOCK).unlink()
# XXX: temporary workaround due to lack of params support in build cache
remove(os.path.join(dvc.cache.local.cache_dir, "stages"))

assert dvc.reproduce(stage.addressing) == [stage]
assert_eq_lockfile(initial_content, read_lock_file())