Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add versioning scheme in the dvc.lock #5128

Merged
merged 3 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 64 additions & 14 deletions dvc/dvcfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from voluptuous import MultipleInvalid

from dvc.exceptions import DvcException
from dvc.parsing.versions import LOCKFILE_VERSION, META_KWD
from dvc.stage import serialize
from dvc.stage.exceptions import (
StageFileBadNameError,
Expand Down Expand Up @@ -113,14 +114,13 @@ def _load(self):
with self.repo.tree.open(self.path) as fd:
stage_text = fd.read()
d = parse_yaml(stage_text, self.path)
self.validate(d, self.relpath)
return d, stage_text
return self.validate(d, self.relpath), stage_text

@classmethod
def validate(cls, d, fname=None):
assert isinstance(cls.SCHEMA, collections.abc.Callable)
try:
cls.SCHEMA(d) # pylint: disable=not-callable
return cls.SCHEMA(d) # pylint: disable=not-callable
except MultipleInvalid as exc:
raise StageFileFormatError(f"'{fname}' format error: {exc}")

Expand Down Expand Up @@ -273,35 +273,83 @@ def merge(self, ancestor, other):
raise NotImplementedError


def get_lockfile_schema(d):
from dvc.schema import (
COMPILED_LOCKFILE_V1_SCHEMA,
COMPILED_LOCKFILE_V2_SCHEMA,
)

schema = {
LOCKFILE_VERSION.V1: COMPILED_LOCKFILE_V1_SCHEMA,
LOCKFILE_VERSION.V2: COMPILED_LOCKFILE_V2_SCHEMA,
}

version = LOCKFILE_VERSION.from_dict(d)
return schema[version]


def migrate_lock_v1_to_v2(d, meta):
stages = {k: v for k, v in d.items()}

for key in stages:
d.pop(key)

# forcing order, meta should always be at the top
d.update(meta)
d["stages"] = stages


class Lockfile(FileMixin):
from dvc.schema import COMPILED_LOCKFILE_SCHEMA as SCHEMA
@classmethod
def validate(cls, d, fname=None):
schema = get_lockfile_schema(d)
try:
return schema(d)
except MultipleInvalid as exc:
raise StageFileFormatError(f"'{fname}' format error: {exc}")

def load(self):
if not self.exists():
return {}

data = load_yaml(self.path, tree=self.repo.tree)
try:
self.validate(data, fname=self.relpath)
except StageFileFormatError:
data = self.validate(data, fname=self.relpath)
except StageFileFormatError as exc:
raise LockfileCorruptedError(
f"Lockfile '{self.relpath}' is corrupted."
)
) from exc
return data

@property
def meta(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will definitely get confused with stage meta :(

version = LOCKFILE_VERSION.V2.value # pylint:disable=no-member
return {META_KWD: {"version": version}}

def dump(self, stage, **kwargs):
stage_data = serialize.to_lockfile(stage)

with modify_yaml(self.path, tree=self.repo.tree) as data:
if not data:
logger.info("Generating lock file '%s'", self.relpath)
version = LOCKFILE_VERSION.from_dict(data)
if version == LOCKFILE_VERSION.V1:
logger.info(
"Migrating lock file '%s' from v1 to v2", self.relpath
)
migrate_lock_v1_to_v2(data, self.meta)
else:
if not data:
data.update(self.meta)
# order is important, meta should always be at the top
logger.info("Generating lock file '%s'", self.relpath)

modified = data.get(stage.name, {}) != stage_data.get(
data["stages"] = data.get("stages", {})
modified = data["stages"].get(stage.name, {}) != stage_data.get(
stage.name, {}
)
if modified:
logger.info("Updating lock file '%s'", self.relpath)
data.update(stage_data)

data["stages"].update(stage_data)

if modified:
self.repo.scm.track_file(self.relpath)
Expand All @@ -314,13 +362,15 @@ def remove_stage(self, stage):
d = parse_yaml_for_update(f.read(), self.path)
self.validate(d, self.path)

if stage.name not in d:
version = LOCKFILE_VERSION.from_dict(d)
data = d if version == LOCKFILE_VERSION.V1 else d.get("stages", {})
if stage.name not in data:
return

logger.debug("Removing '%s' from '%s'", stage.name, self.path)
del d[stage.name]
del data[stage.name]

if d:
if data:
dump_yaml(self.path, d)
else:
self.remove()
Expand Down
38 changes: 38 additions & 0 deletions dvc/parsing/versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import enum
from collections.abc import Mapping

from voluptuous import validators

VERSION_KWD = "version"
META_KWD = "meta"


def lockfile_version_schema(value):
expected = [LOCKFILE_VERSION.V2.value] # pylint: disable=no-member
msg = "invalid version {}, expected one of {}".format(value, expected)
return validators.Any(*expected, msg=msg)(value)


class VersionEnum(str, enum.Enum):
@classmethod
def all_versions(cls):
return [v.value for v in cls]


class LOCKFILE_VERSION(VersionEnum):
V1 = "1.0"
V2 = "2.0"

@classmethod
def from_dict(cls, data):
# 1) if it's empty or or is not a dict, use the latest one (V2).
# 2) use the `meta.version`
# 3) if it's not in any of the supported version, use the latest schema
# 4) if there's no version identifier, it's a V1
if not data or not isinstance(data, Mapping):
return cls(cls.V2)

version = data.get(META_KWD, {}).get(VERSION_KWD)
if version:
return cls(version if version in cls.all_versions() else cls.V2)
return cls(cls.V1)
15 changes: 12 additions & 3 deletions dvc/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dvc.hash_info import HashInfo
from dvc.output import CHECKSUMS_SCHEMA, BaseOutput
from dvc.parsing import DO_KWD, FOREACH_KWD, VARS_KWD
from dvc.parsing.versions import META_KWD, VERSION_KWD, lockfile_version_schema
from dvc.stage.params import StageParams

STAGES = "stages"
Expand All @@ -13,7 +14,7 @@
StageParams.PARAM_WDIR: Any(str, None),
StageParams.PARAM_DEPS: Any([dependency.SCHEMA], None),
StageParams.PARAM_OUTS: Any([output.SCHEMA], None),
StageParams.PARAM_LOCKED: bool, # backard compatibility
StageParams.PARAM_LOCKED: bool, # backward compatibility
StageParams.PARAM_FROZEN: bool,
StageParams.PARAM_META: object,
StageParams.PARAM_ALWAYS_CHANGED: bool,
Expand All @@ -33,7 +34,14 @@
StageParams.PARAM_PARAMS: {str: {str: object}},
StageParams.PARAM_OUTS: [DATA_SCHEMA],
}
LOCKFILE_SCHEMA = {str: LOCK_FILE_STAGE_SCHEMA}

LOCKFILE_STAGES_SCHEMA = {str: LOCK_FILE_STAGE_SCHEMA}

LOCKFILE_V1_SCHEMA = LOCKFILE_STAGES_SCHEMA
LOCKFILE_V2_SCHEMA = {
STAGES: LOCKFILE_STAGES_SCHEMA,
META_KWD: {Required(VERSION_KWD): lockfile_version_schema},
}

OUT_PSTAGE_DETAILED_SCHEMA = {
str: {
Expand Down Expand Up @@ -100,4 +108,5 @@
COMPILED_SINGLE_STAGE_SCHEMA = Schema(SINGLE_STAGE_SCHEMA)
COMPILED_MULTI_STAGE_SCHEMA = Schema(MULTI_STAGE_SCHEMA)
COMPILED_LOCK_FILE_STAGE_SCHEMA = Schema(LOCK_FILE_STAGE_SCHEMA)
COMPILED_LOCKFILE_SCHEMA = Schema(LOCKFILE_SCHEMA)
COMPILED_LOCKFILE_V1_SCHEMA = Schema(LOCKFILE_V1_SCHEMA)
COMPILED_LOCKFILE_V2_SCHEMA = Schema(LOCKFILE_V2_SCHEMA)
9 changes: 8 additions & 1 deletion dvc/stage/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dvc import dependency, output
from dvc.hash_info import HashInfo
from dvc.parsing import FOREACH_KWD, JOIN, DataResolver, EntryNotFound
from dvc.parsing.versions import LOCKFILE_VERSION
from dvc.path_info import PathInfo

from . import PipelineStage, Stage, loads_from
Expand All @@ -24,7 +25,13 @@ def __init__(self, dvcfile, data, lockfile_data=None):
self.data = data or {}
self.stages_data = self.data.get("stages", {})
self.repo = self.dvcfile.repo
self._lockfile_data = lockfile_data or {}

lockfile_data = lockfile_data or {}
version = LOCKFILE_VERSION.from_dict(lockfile_data)
if version == LOCKFILE_VERSION.V1:
self._lockfile_data = lockfile_data
else:
self._lockfile_data = lockfile_data.get("stages", {})

@cached_property
def resolver(self):
Expand Down
10 changes: 7 additions & 3 deletions tests/func/test_dvcfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,12 @@ def test_remove_stage_lockfile(tmp_dir, dvc, run_copy):
lock_file = dvc_file._lockfile
assert dvc_file.exists()
assert lock_file.exists()
assert {"copy-bar-foobar", "copy-foo-bar"} == set(lock_file.load().keys())
assert {"copy-bar-foobar", "copy-foo-bar"} == set(
lock_file.load()["stages"].keys()
)
lock_file.remove_stage(stage)

assert ["copy-bar-foobar"] == list(lock_file.load().keys())
assert ["copy-bar-foobar"] == list(lock_file.load()["stages"].keys())

# sanity check
stage2.reload()
Expand Down Expand Up @@ -374,7 +376,9 @@ def test_dvcfile_dump_preserves_comments(tmp_dir, dvc):
],
)
def test_dvcfile_try_dumping_parametrized_stage(tmp_dir, dvc, data, name):
dump_yaml("dvc.yaml", {"stages": data, "vars": [{"foo": "foobar"}]})
dump_yaml(
"dvc.yaml", {"stages": data, "vars": [{"foo": "foobar"}]},
)

stage = dvc.stage.load_one(name=name)
dvcfile = stage.dvcfile
Expand Down
87 changes: 79 additions & 8 deletions tests/func/test_lockfile.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import logging
import os
from collections import OrderedDict
from operator import itemgetter

from dvc.dvcfile import PIPELINE_LOCK
import pytest

from dvc.dvcfile import PIPELINE_LOCK, Lockfile, LockfileCorruptedError
from dvc.hash_info import HashInfo
from dvc.stage.utils import split_params_deps
from dvc.utils.fs import remove
from dvc.utils.serialize import dumps_yaml, parse_yaml_for_update
from dvc.utils.serialize import (
dump_yaml,
dumps_yaml,
load_yaml,
parse_yaml_for_update,
)
from tests.func.test_run_multistage import supported_params

FS_STRUCTURE = {
Expand Down Expand Up @@ -37,7 +47,7 @@ def test_deps_outs_are_sorted_by_path(tmp_dir, dvc, run_head):
run_head(*deps, name="copy-first-line")

initial_content = read_lock_file()
lock = initial_content["copy-first-line"]
lock = initial_content["stages"]["copy-first-line"]

# lock stage key order:
assert list(lock.keys()) == ["cmd", "deps", "outs"]
Expand All @@ -54,9 +64,9 @@ def test_deps_outs_are_sorted_by_path(tmp_dir, dvc, run_head):
assert list(map(itemgetter("path"), lock["deps"])) == sorted(deps)

# outs are too
assert list(
map(itemgetter("path"), initial_content["copy-first-line"]["outs"])
) == [d + "-1" for d in sorted(deps)]
assert list(map(itemgetter("path"), lock["outs"])) == [
d + "-1" for d in sorted(deps)
]


def test_order_is_preserved_when_pipeline_order_changes(
Expand Down Expand Up @@ -98,7 +108,7 @@ def test_cmd_changes_other_orders_are_preserved(tmp_dir, dvc, run_head):
stage.cmd = " ".join(stage.cmd.split())
stage.dvcfile._dump_pipeline_file(stage)

initial_content["copy-first-line"]["cmd"] = stage.cmd
initial_content["stages"]["copy-first-line"]["cmd"] = stage.cmd

assert dvc.reproduce(stage.addressing) == [stage]

Expand All @@ -121,7 +131,7 @@ def test_params_dump(tmp_dir, dvc, run_head):
)

initial_content = read_lock_file()
lock = initial_content["copy-first-line"]
lock = initial_content["stages"]["copy-first-line"]

# lock stage key order:
assert list(lock.keys()) == ["cmd", "deps", "params", "outs"]
Expand Down Expand Up @@ -155,3 +165,64 @@ def test_params_dump(tmp_dir, dvc, run_head):
remove(item)
assert dvc.reproduce(stage.addressing) == [stage]
assert_eq_lockfile(initial_content, read_lock_file())


@pytest.fixture
def v1_repo_lock(tmp_dir, dvc):
"""Generates a repo having v1 format lockfile"""
size = 5 if os.name == "nt" else 4
hi = HashInfo(
name="md5", size=size, value="c157a79031e1c40f85931829bc5fc552"
)
v1_lockdata = {
"foo": {"cmd": "echo foo"},
"bar": {
"cmd": "echo bar>bar.txt",
"outs": [{"path": "bar.txt", **hi.to_dict()}],
},
}
dvc.run(cmd="echo foo", name="foo", no_exec=True)
dvc.run(cmd="echo bar>bar.txt", outs=["bar.txt"], name="bar", no_exec=True)
dump_yaml(tmp_dir / "dvc.lock", v1_lockdata)
yield v1_lockdata


def test_can_read_v1_lockfile(tmp_dir, dvc, v1_repo_lock):
assert dvc.status() == {
"bar": [
{"changed outs": {"bar.txt": "not in cache"}},
"always changed",
],
"foo": ["always changed"],
}


def test_migrates_v1_lockfile_to_v2_during_dump(
tmp_dir, dvc, v1_repo_lock, caplog
):
caplog.clear()
with caplog.at_level(logging.INFO, logger="dvc.dvcfile"):
assert dvc.reproduce()

assert "Migrating lock file 'dvc.lock' from v1 to v2" in caplog.messages
d = load_yaml(tmp_dir / "dvc.lock")
assert d == {"stages": v1_repo_lock, "meta": {"version": "2.0"}}


@pytest.mark.parametrize(
"version_info",
[{"version": "1.1"}, {"version": "2.1"}, {"version": "3.0"}],
)
def test_lockfile_invalid_versions(tmp_dir, dvc, version_info):
lockdata = {"meta": version_info, "stages": {"foo": {"cmd": "echo foo"}}}
dump_yaml("dvc.lock", lockdata)
with pytest.raises(LockfileCorruptedError) as exc_info:
Lockfile(dvc, tmp_dir / "dvc.lock").load()

assert str(exc_info.value) == "Lockfile 'dvc.lock' is corrupted."
assert (
str(exc_info.value.__cause__) == "'dvc.lock' format error: "
f"invalid version {version_info['version']}, "
"expected one of ['2.0'] for dictionary value @ "
"data['meta']['version']"
)
Loading