diff --git a/dvc/serialize.py b/dvc/serialize.py index 976a8fa773..46e6895b59 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -1,3 +1,6 @@ +from collections import OrderedDict +from functools import partial +from operator import attrgetter from typing import TYPE_CHECKING from funcy import rpartial, lsplit @@ -15,9 +18,12 @@ DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE +sort_by_path = partial(sorted, key=attrgetter("def_path")) + + def _get_outs(stage: "PipelineStage"): outs_bucket = {} - for o in stage.outs: + for o in sort_by_path(stage.outs): bucket_key = ["metrics"] if o.metric else ["outs"] if not o.metric and o.persist: @@ -26,7 +32,7 @@ def _get_outs(stage: "PipelineStage"): bucket_key += ["no_cache"] key = "_".join(bucket_key) outs_bucket[key] = outs_bucket.get(key, []) + [o.def_path] - return outs_bucket + return [(key, outs_bucket[key]) for key in sorted(outs_bucket.keys())] def get_params_deps(stage: "PipelineStage"): @@ -40,24 +46,29 @@ def _serialize_params(params: List[ParamsDependency]): which is in the shape of: ['lr', 'train', {'params2.yaml': ['lr']}] + `key_vals` - which is list of params with values, used in a lockfile which is in the shape of: {'params.yaml': {'lr': '1', 'train': 2}, {'params2.yaml': {'lr': '1'}} """ keys = [] - key_vals = {} + key_vals = OrderedDict() - for param_dep in params: + for param_dep in sort_by_path(params): dump = param_dep.dumpd() path, params = dump[PARAM_PATH], dump[PARAM_PARAMS] k = list(params.keys()) if not k: continue - # if it's not a default file, change the shape - # to: {path: k} - keys.extend(k if path == DEFAULT_PARAMS_FILE else [{path: k}]) - key_vals.update({path: params}) - + key_vals[path] = OrderedDict([(key, params[key]) for key in sorted(k)]) + # params from default file is always kept at the start of the `params:` + if path == DEFAULT_PARAMS_FILE: + keys = k + keys + key_vals.move_to_end(path, last=False) + else: + # if it's not a default file, change the shape + # to: {path: k} + keys.append({path: k}) return keys, key_vals @@ -65,35 +76,34 @@ def to_pipeline_file(stage: "PipelineStage"): params, deps = get_params_deps(stage) serialized_params, _ = _serialize_params(params) + res = [ + (stage.PARAM_CMD, stage.cmd), + (stage.PARAM_WDIR, stage.resolve_wdir()), + (stage.PARAM_DEPS, [d.def_path for d in deps]), + (stage.PARAM_PARAMS, serialized_params), + *_get_outs(stage), + (stage.PARAM_LOCKED, stage.locked), + (stage.PARAM_ALWAYS_CHANGED, stage.always_changed), + ] return { - stage.name: { - key: value - for key, value in { - stage.PARAM_CMD: stage.cmd, - stage.PARAM_WDIR: stage.resolve_wdir(), - stage.PARAM_DEPS: [d.def_path for d in deps], - stage.PARAM_PARAMS: serialized_params, - **_get_outs(stage), - stage.PARAM_LOCKED: stage.locked, - stage.PARAM_ALWAYS_CHANGED: stage.always_changed, - }.items() - if value - } + stage.name: OrderedDict([(key, value) for key, value in res if value]) } -def to_lockfile(stage: "PipelineStage") -> dict: +def to_lockfile(stage: "PipelineStage"): assert stage.cmd assert stage.name - res = {"cmd": stage.cmd} + res = OrderedDict([("cmd", stage.cmd)]) params, deps = get_params_deps(stage) - deps = [ - {"path": dep.def_path, dep.checksum_type: dep.checksum} for dep in deps - ] - outs = [ - {"path": out.def_path, out.checksum_type: out.checksum} - for out in stage.outs + deps, outs = [ + [ + OrderedDict( + [("path", item.def_path), (item.checksum_type, item.checksum),] + ) + for item in sort_by_path(items) + ] + for items in [deps, stage.outs] ] if deps: res["deps"] = deps diff --git a/dvc/utils/stage.py b/dvc/utils/stage.py index ce5a797144..2b322398c4 100644 --- a/dvc/utils/stage.py +++ b/dvc/utils/stage.py @@ -1,3 +1,5 @@ +from collections import OrderedDict + import yaml from ruamel.yaml import YAML from ruamel.yaml.error import YAMLError @@ -42,4 +44,9 @@ def dump_stage_file(path, data): with open(path, "w", encoding="utf-8") as fd: yaml = YAML() yaml.default_flow_style = False + # tell Dumper to represent OrderedDict as + # normal dict + yaml.Representer.add_representer( + OrderedDict, yaml.Representer.represent_dict + ) yaml.dump(data, fd) diff --git a/tests/func/test_lockfile.py b/tests/func/test_lockfile.py new file mode 100644 index 0000000000..d3bc409855 --- /dev/null +++ b/tests/func/test_lockfile.py @@ -0,0 +1,183 @@ +import os +from collections import OrderedDict +from operator import itemgetter +from textwrap import dedent + +import pytest +import yaml +from dvc.dvcfile import PIPELINE_LOCK +from dvc.serialize import get_params_deps +from dvc.utils.fs import remove +from dvc.utils.stage import parse_stage_for_update + +from tests.func.test_run_multistage import supported_params + + +FS_STRUCTURE = { + "foo": "bar\nfoobar", + "bar": "foo\nfoobar", + "foobar": "foobar\nbar", + "params.yaml": yaml.dump(supported_params), + "params2.yaml": yaml.dump(supported_params), +} + + +@pytest.fixture +def run_head(tmp_dir, dvc): + """Output first line of each file to different file with '-1' appended.""" + tmp_dir.gen( + "head.py", + dedent( + """ + import sys + _, *files = sys.argv + for file in files: + with open(file) as f, open(file +"-1","w+") as w: + w.write(f.readline()) + """ + ), + ) + + def run(*args, **run_kwargs): + return dvc.run( + cmd="python head.py {}".format(" ".join(args)), + outs=[dep + "-1" for dep in args], + deps=args, + **run_kwargs + ) + + return run + + +def read_lock_file(file=PIPELINE_LOCK): + with open(file) as f: + data = parse_stage_for_update(f.read(), file) + assert isinstance(data, OrderedDict) + return data + + +def assert_eq_lockfile(previous, new): + for content in (previous, new): + assert isinstance(content, OrderedDict) + + # if they both are OrderedDict, then `==` will also check for order + assert previous == new + + +def test_deps_outs_are_sorted_by_path(tmp_dir, dvc, run_head): + tmp_dir.gen(FS_STRUCTURE) + deps = ["foo", "bar", "foobar"] + run_head(*deps, name="copy-first-line") + + initial_content = read_lock_file() + lock = initial_content["copy-first-line"] + + # lock stage key order: + assert list(lock.keys()) == ["cmd", "deps", "outs"] + + # `path` key appear first and then the `md5` + assert all(list(dep.keys()) == ["path", "md5"] for dep in lock["deps"]) + assert all(list(out.keys()) == ["path", "md5"] for out in lock["outs"]) + + # deps are always sorted by the file path naming + assert list(map(itemgetter("path"), lock["deps"])) == sorted(deps) + + # outs are too + assert list( + map(itemgetter("path"), initial_content["copy-first-line"]["outs"]) + ) == [d + "-1" for d in sorted(deps)] + + +def test_order_is_preserved_when_pipeline_order_changes( + tmp_dir, dvc, run_head +): + tmp_dir.gen(FS_STRUCTURE) + deps = ["foo", "bar", "foobar"] + stage = run_head(*deps, name="copy-first-line") + + initial_content = read_lock_file() + # reverse order of stage.outs and dump to the pipeline file + # then, again change stage.deps and dump to the pipeline file + reversal = stage.outs.reverse, stage.deps.reverse + for reverse_items in reversal: + reverse_items() + stage.dvcfile._dump_pipeline_file(stage) + + # we only changed the order, should not reproduce + assert not dvc.reproduce(stage.addressing) + + new_lock_content = read_lock_file() + assert_eq_lockfile(new_lock_content, initial_content) + + (tmp_dir / PIPELINE_LOCK).unlink() + assert dvc.reproduce(stage.addressing) == [stage] + new_lock_content = read_lock_file() + assert_eq_lockfile(new_lock_content, initial_content) + + +def test_cmd_changes_other_orders_are_preserved(tmp_dir, dvc, run_head): + tmp_dir.gen(FS_STRUCTURE) + deps = ["foo", "bar", "foobar"] + stage = run_head(*deps, name="copy-first-line") + + initial_content = read_lock_file() + # let's change cmd in pipeline file + # it should only change "cmd", otherwise it should be + # structurally same as cmd + stage.cmd = " ".join(stage.cmd.split()) + stage.dvcfile._dump_pipeline_file(stage) + + initial_content["copy-first-line"]["cmd"] = stage.cmd + + assert dvc.reproduce(stage.addressing) == [stage] + + new_lock_content = read_lock_file() + assert_eq_lockfile(new_lock_content, initial_content) + + +def test_params_dump(tmp_dir, dvc, run_head): + tmp_dir.gen(FS_STRUCTURE) + + stage = run_head( + "foo", + "bar", + "foobar", + name="copy-first-line", + params=[ + "params2.yaml:answer,lists,name", + "params.yaml:lists,floats,nested.nested1,nested.nested1.nested2", + ], + ) + + initial_content = read_lock_file() + lock = initial_content["copy-first-line"] + + # lock stage key order: + assert list(lock.keys()) == ["cmd", "deps", "params", "outs"] + assert list(lock["params"].keys()) == ["params.yaml", "params2.yaml"] + + # # params keys are always sorted by the name + assert list(lock["params"]["params.yaml"].keys()) == [ + "floats", + "lists", + "nested.nested1", + "nested.nested1.nested2", + ] + assert list(lock["params"]["params2.yaml"]) == ["answer", "lists", "name"] + + assert not dvc.reproduce(stage.addressing) + + # let's change the order of params and dump them in pipeline file + params, _ = get_params_deps(stage) + for param in params: + param.params.reverse() + + stage.dvcfile._dump_pipeline_file(stage) + assert not dvc.reproduce(stage.addressing) + + (tmp_dir / PIPELINE_LOCK).unlink() + # XXX: temporary workaround due to lack of params support in build cache + remove(os.path.join(dvc.cache.local.cache_dir, "stages")) + + assert dvc.reproduce(stage.addressing) == [stage] + assert_eq_lockfile(initial_content, read_lock_file())