From 8f79d64865fdc77492ca3fcf7edd4eba15916af0 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Thu, 30 Apr 2020 12:04:38 +0545 Subject: [PATCH 1/2] dump: lockfile is dumped deterministically The dump is no longer deterministic/dependent on the pipeline file, but is sorted based on file names in outs, deps or params. Also, the params inside each files are also sorted based on name. However, the objects inside params are not sorted deterministically as I think it's too much to sort that, and is not easy (considering the types of objects it might hold, eg: lists, objects, etc). This will also provide ordered dumps for Python3.5 --- dvc/serialize.py | 73 ++++++++------ dvc/utils/stage.py | 7 ++ tests/func/test_lockfile.py | 183 ++++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+), 30 deletions(-) create mode 100644 tests/func/test_lockfile.py diff --git a/dvc/serialize.py b/dvc/serialize.py index 976a8fa773..d298f70afb 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -1,3 +1,6 @@ +from collections import OrderedDict +from functools import partial +from operator import attrgetter from typing import TYPE_CHECKING from funcy import rpartial, lsplit @@ -15,9 +18,12 @@ DEFAULT_PARAMS_FILE = ParamsDependency.DEFAULT_PARAMS_FILE +sort_by_path = partial(sorted, key=attrgetter("def_path")) + + def _get_outs(stage: "PipelineStage"): outs_bucket = {} - for o in stage.outs: + for o in sort_by_path(stage.outs): bucket_key = ["metrics"] if o.metric else ["outs"] if not o.metric and o.persist: @@ -26,7 +32,7 @@ def _get_outs(stage: "PipelineStage"): bucket_key += ["no_cache"] key = "_".join(bucket_key) outs_bucket[key] = outs_bucket.get(key, []) + [o.def_path] - return outs_bucket + return [(key, outs_bucket[key]) for key in sorted(outs_bucket.keys())] def get_params_deps(stage: "PipelineStage"): @@ -40,24 +46,29 @@ def _serialize_params(params: List[ParamsDependency]): which is in the shape of: ['lr', 'train', {'params2.yaml': ['lr']}] + `key_vals` - which is list of params with values, used in a lockfile which is in the shape of: {'params.yaml': {'lr': '1', 'train': 2}, {'params2.yaml': {'lr': '1'}} """ keys = [] - key_vals = {} + key_vals = OrderedDict() - for param_dep in params: + for param_dep in sort_by_path(params): dump = param_dep.dumpd() path, params = dump[PARAM_PATH], dump[PARAM_PARAMS] k = list(params.keys()) if not k: continue - # if it's not a default file, change the shape - # to: {path: k} - keys.extend(k if path == DEFAULT_PARAMS_FILE else [{path: k}]) - key_vals.update({path: params}) - + key_vals[path] = OrderedDict([(key, params[key]) for key in sorted(k)]) + # params from default file is always kept at the start of the `params:` + if path == DEFAULT_PARAMS_FILE: + keys = k + keys + key_vals.move_to_end(path, last=False) + else: + # if it's not a default file, change the shape + # to: {path: k} + keys.append({path: k}) return keys, key_vals @@ -65,35 +76,37 @@ def to_pipeline_file(stage: "PipelineStage"): params, deps = get_params_deps(stage) serialized_params, _ = _serialize_params(params) + res = [ + (stage.PARAM_CMD, stage.cmd), + (stage.PARAM_WDIR, stage.resolve_wdir()), + (stage.PARAM_DEPS, [d.def_path for d in deps]), + (stage.PARAM_PARAMS, serialized_params), + *_get_outs(stage), + (stage.PARAM_LOCKED, stage.locked), + (stage.PARAM_ALWAYS_CHANGED, stage.always_changed), + ] return { - stage.name: { - key: value - for key, value in { - stage.PARAM_CMD: stage.cmd, - stage.PARAM_WDIR: stage.resolve_wdir(), - stage.PARAM_DEPS: [d.def_path for d in deps], - stage.PARAM_PARAMS: serialized_params, - **_get_outs(stage), - stage.PARAM_LOCKED: stage.locked, - stage.PARAM_ALWAYS_CHANGED: stage.always_changed, - }.items() - if value - } + stage.name: OrderedDict([(key, value) for key, value in res if value]) } -def to_lockfile(stage: "PipelineStage") -> dict: +def to_lockfile(stage: "PipelineStage"): assert stage.cmd assert stage.name - res = {"cmd": stage.cmd} + res = OrderedDict([("cmd", stage.cmd)]) params, deps = get_params_deps(stage) - deps = [ - {"path": dep.def_path, dep.checksum_type: dep.checksum} for dep in deps - ] - outs = [ - {"path": out.def_path, out.checksum_type: out.checksum} - for out in stage.outs + deps, outs = [ + [ + OrderedDict( + [ + ("path", item.def_path), + (item.checksum_type, item.checksum), + ] + ) + for item in sort_by_path(items) + ] + for items in [deps, stage.outs] ] if deps: res["deps"] = deps diff --git a/dvc/utils/stage.py b/dvc/utils/stage.py index ce5a797144..2b322398c4 100644 --- a/dvc/utils/stage.py +++ b/dvc/utils/stage.py @@ -1,3 +1,5 @@ +from collections import OrderedDict + import yaml from ruamel.yaml import YAML from ruamel.yaml.error import YAMLError @@ -42,4 +44,9 @@ def dump_stage_file(path, data): with open(path, "w", encoding="utf-8") as fd: yaml = YAML() yaml.default_flow_style = False + # tell Dumper to represent OrderedDict as + # normal dict + yaml.Representer.add_representer( + OrderedDict, yaml.Representer.represent_dict + ) yaml.dump(data, fd) diff --git a/tests/func/test_lockfile.py b/tests/func/test_lockfile.py new file mode 100644 index 0000000000..d3bc409855 --- /dev/null +++ b/tests/func/test_lockfile.py @@ -0,0 +1,183 @@ +import os +from collections import OrderedDict +from operator import itemgetter +from textwrap import dedent + +import pytest +import yaml +from dvc.dvcfile import PIPELINE_LOCK +from dvc.serialize import get_params_deps +from dvc.utils.fs import remove +from dvc.utils.stage import parse_stage_for_update + +from tests.func.test_run_multistage import supported_params + + +FS_STRUCTURE = { + "foo": "bar\nfoobar", + "bar": "foo\nfoobar", + "foobar": "foobar\nbar", + "params.yaml": yaml.dump(supported_params), + "params2.yaml": yaml.dump(supported_params), +} + + +@pytest.fixture +def run_head(tmp_dir, dvc): + """Output first line of each file to different file with '-1' appended.""" + tmp_dir.gen( + "head.py", + dedent( + """ + import sys + _, *files = sys.argv + for file in files: + with open(file) as f, open(file +"-1","w+") as w: + w.write(f.readline()) + """ + ), + ) + + def run(*args, **run_kwargs): + return dvc.run( + cmd="python head.py {}".format(" ".join(args)), + outs=[dep + "-1" for dep in args], + deps=args, + **run_kwargs + ) + + return run + + +def read_lock_file(file=PIPELINE_LOCK): + with open(file) as f: + data = parse_stage_for_update(f.read(), file) + assert isinstance(data, OrderedDict) + return data + + +def assert_eq_lockfile(previous, new): + for content in (previous, new): + assert isinstance(content, OrderedDict) + + # if they both are OrderedDict, then `==` will also check for order + assert previous == new + + +def test_deps_outs_are_sorted_by_path(tmp_dir, dvc, run_head): + tmp_dir.gen(FS_STRUCTURE) + deps = ["foo", "bar", "foobar"] + run_head(*deps, name="copy-first-line") + + initial_content = read_lock_file() + lock = initial_content["copy-first-line"] + + # lock stage key order: + assert list(lock.keys()) == ["cmd", "deps", "outs"] + + # `path` key appear first and then the `md5` + assert all(list(dep.keys()) == ["path", "md5"] for dep in lock["deps"]) + assert all(list(out.keys()) == ["path", "md5"] for out in lock["outs"]) + + # deps are always sorted by the file path naming + assert list(map(itemgetter("path"), lock["deps"])) == sorted(deps) + + # outs are too + assert list( + map(itemgetter("path"), initial_content["copy-first-line"]["outs"]) + ) == [d + "-1" for d in sorted(deps)] + + +def test_order_is_preserved_when_pipeline_order_changes( + tmp_dir, dvc, run_head +): + tmp_dir.gen(FS_STRUCTURE) + deps = ["foo", "bar", "foobar"] + stage = run_head(*deps, name="copy-first-line") + + initial_content = read_lock_file() + # reverse order of stage.outs and dump to the pipeline file + # then, again change stage.deps and dump to the pipeline file + reversal = stage.outs.reverse, stage.deps.reverse + for reverse_items in reversal: + reverse_items() + stage.dvcfile._dump_pipeline_file(stage) + + # we only changed the order, should not reproduce + assert not dvc.reproduce(stage.addressing) + + new_lock_content = read_lock_file() + assert_eq_lockfile(new_lock_content, initial_content) + + (tmp_dir / PIPELINE_LOCK).unlink() + assert dvc.reproduce(stage.addressing) == [stage] + new_lock_content = read_lock_file() + assert_eq_lockfile(new_lock_content, initial_content) + + +def test_cmd_changes_other_orders_are_preserved(tmp_dir, dvc, run_head): + tmp_dir.gen(FS_STRUCTURE) + deps = ["foo", "bar", "foobar"] + stage = run_head(*deps, name="copy-first-line") + + initial_content = read_lock_file() + # let's change cmd in pipeline file + # it should only change "cmd", otherwise it should be + # structurally same as cmd + stage.cmd = " ".join(stage.cmd.split()) + stage.dvcfile._dump_pipeline_file(stage) + + initial_content["copy-first-line"]["cmd"] = stage.cmd + + assert dvc.reproduce(stage.addressing) == [stage] + + new_lock_content = read_lock_file() + assert_eq_lockfile(new_lock_content, initial_content) + + +def test_params_dump(tmp_dir, dvc, run_head): + tmp_dir.gen(FS_STRUCTURE) + + stage = run_head( + "foo", + "bar", + "foobar", + name="copy-first-line", + params=[ + "params2.yaml:answer,lists,name", + "params.yaml:lists,floats,nested.nested1,nested.nested1.nested2", + ], + ) + + initial_content = read_lock_file() + lock = initial_content["copy-first-line"] + + # lock stage key order: + assert list(lock.keys()) == ["cmd", "deps", "params", "outs"] + assert list(lock["params"].keys()) == ["params.yaml", "params2.yaml"] + + # # params keys are always sorted by the name + assert list(lock["params"]["params.yaml"].keys()) == [ + "floats", + "lists", + "nested.nested1", + "nested.nested1.nested2", + ] + assert list(lock["params"]["params2.yaml"]) == ["answer", "lists", "name"] + + assert not dvc.reproduce(stage.addressing) + + # let's change the order of params and dump them in pipeline file + params, _ = get_params_deps(stage) + for param in params: + param.params.reverse() + + stage.dvcfile._dump_pipeline_file(stage) + assert not dvc.reproduce(stage.addressing) + + (tmp_dir / PIPELINE_LOCK).unlink() + # XXX: temporary workaround due to lack of params support in build cache + remove(os.path.join(dvc.cache.local.cache_dir, "stages")) + + assert dvc.reproduce(stage.addressing) == [stage] + assert_eq_lockfile(initial_content, read_lock_file()) From c95f974f7dda4216a3f6a35d512cfb899b11e80d Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Thu, 30 Apr 2020 16:27:53 +0000 Subject: [PATCH 2/2] Restyled by black --- dvc/serialize.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dvc/serialize.py b/dvc/serialize.py index d298f70afb..46e6895b59 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -99,10 +99,7 @@ def to_lockfile(stage: "PipelineStage"): deps, outs = [ [ OrderedDict( - [ - ("path", item.def_path), - (item.checksum_type, item.checksum), - ] + [("path", item.def_path), (item.checksum_type, item.checksum),] ) for item in sort_by_path(items) ]