Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: prepare for dvcx summon/publish #3251

Merged
merged 7 commits into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 100 additions & 133 deletions dvc/api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from builtins import open as builtin_open
import importlib
import os
import sys
from contextlib import contextmanager, _GeneratorContextManager as GCM
import threading

from funcy import wrap_with
from funcy import cached_property, lmap
import ruamel.yaml
from voluptuous import Schema, Required, Invalid

Expand All @@ -14,31 +11,6 @@
from dvc.external_repo import external_repo


SUMMON_FILE_SCHEMA = Schema(
{
Required("objects"): [
{
Required("name"): str,
"meta": dict,
Required("summon"): {
Required("type"): str,
"deps": [str],
str: object,
},
}
]
}
)
SUMMON_PYTHON_SCHEMA = Schema(
{
Required("type"): "python",
Required("call"): str,
"args": dict,
"deps": [str],
}
)


class SummonError(DvcException):
pass

Expand Down Expand Up @@ -120,126 +92,121 @@ def _make_repo(repo_url=None, rev=None):
yield repo


def summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml", args=None):
"""Instantiate an object described in the `summon_file`."""
with prepare_summon(
name, repo=repo, rev=rev, summon_file=summon_file
) as desc:
class SummonFile(object):
DEFAULT_FILENAME = "dvcsummon.yaml"
SCHEMA = Schema(
{
Required("dvc-objects", default={}): {
str: {
"meta": dict,
Required("summon"): {
Required("type"): str,
"deps": [str],
str: object,
},
}
}
}
)

def __init__(self, repo_obj, summon_file):
self.repo = repo_obj
self.filename = summon_file
self._path = os.path.join(self.repo.root_dir, summon_file)

@staticmethod
@contextmanager
def prepare(repo=None, rev=None, summon_file=None):
"""Does a couple of things every summon needs as a prerequisite:
clones the repo and parses the summon file.

Calling code is expected to complete the summon logic following
instructions stated in "summon" dict of the object spec.

Returns a SummonFile instance, which contains references to a Repo
object, named object specification and resolved paths to deps.
"""
summon_file = summon_file or SummonFile.DEFAULT_FILENAME
with _make_repo(repo, rev=rev) as _repo:
_require_dvc(_repo)
try:
yield SummonFile(_repo, summon_file)
except SummonError as exc:
raise SummonError(
str(exc) + " at '{}' in '{}'".format(summon_file, _repo)
) from exc.__cause__

@cached_property
def objects(self):
return self._read_yaml()["dvc-objects"]

def _read_yaml(self):
try:
summon_dict = SUMMON_PYTHON_SCHEMA(desc.obj["summon"])
with builtin_open(self._path, mode="r") as fd:
return self.SCHEMA(ruamel.yaml.safe_load(fd.read()))
except FileNotFoundError as exc:
raise SummonError("Summon file not found") from exc
except ruamel.yaml.YAMLError as exc:
raise SummonError("Failed to parse summon file") from exc
except Invalid as exc:
raise SummonError(str(exc)) from exc

_args = {**summon_dict.get("args", {}), **(args or {})}
return _invoke_method(summon_dict["call"], _args, desc.repo.root_dir)

raise SummonError(str(exc)) from None

@contextmanager
def prepare_summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml"):
"""Does a couple of things every summon needs as a prerequisite:
clones the repo, parses the summon file and pulls the deps.

Calling code is expected to complete the summon logic following
instructions stated in "summon" dict of the object spec.

Returns a SummonDesc instance, which contains references to a Repo object,
named object specification and resolved paths to deps.
"""
with _make_repo(repo, rev=rev) as _repo:
_require_dvc(_repo)
def _write_yaml(self, objects):
try:
path = os.path.join(_repo.root_dir, summon_file)
obj = _get_object_spec(name, path)
yield SummonDesc(_repo, obj)
except SummonError as exc:
raise SummonError(
str(exc) + " at '{}' in '{}'".format(summon_file, repo)
) from exc.__cause__


class SummonDesc:
def __init__(self, repo, obj):
self.repo = repo
self.obj = obj
self._pull_deps()

@property
def deps(self):
return [os.path.join(self.repo.root_dir, d) for d in self._deps]

@property
def _deps(self):
return self.obj["summon"].get("deps", [])

def _pull_deps(self):
if not self._deps:
return
with builtin_open(self._path, "w") as fd:
content = self.SCHEMA({"dvc-objects": objects})
ruamel.yaml.safe_dump(content, fd)
except Invalid as exc:
raise SummonError(str(exc)) from None

outs = [self.repo.find_out_by_relpath(d) for d in self._deps]
def abs(self, path):
return os.path.join(self.repo.root_dir, path)

with self.repo.state:
for out in outs:
self.repo.cloud.pull(out.get_used_cache())
out.checkout()
def pull(self, targets):
self.repo.pull([self.abs(target) for target in targets])

def pull_deps(self, dobj):
self.pull(dobj["summon"].get("deps", []))

def _get_object_spec(name, path):
"""
Given a summonable object's name, search for it on the given file
and return its description.
"""
try:
with builtin_open(path, "r") as fobj:
content = SUMMON_FILE_SCHEMA(ruamel.yaml.safe_load(fobj.read()))
objects = [x for x in content["objects"] if x["name"] == name]

if not objects:
raise SummonError("No object with name '{}'".format(name))
elif len(objects) >= 2:
def get(self, name):
"""
Given a summonable object's name, search for it this file
and return its description.
"""
if name not in self.objects:
raise SummonError(
"More than one object with name '{}'".format(name)
"No object with name '{}' in '{}'".format(name, self.filename)
)

return objects[0]

except FileNotFoundError as exc:
raise SummonError("Summon file not found") from exc
except ruamel.yaml.YAMLError as exc:
raise SummonError("Failed to parse summon file") from exc
except Invalid as exc:
raise SummonError(str(exc)) from exc
return self.objects[name]

def set(self, name, dobj, overwrite=True):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should separate pushing dvc-object to a remote repo and just writing an object to a repo.

I see a few scenarios:

  1. write a dvc-object to a summon file
  2. write and commit
  3. commit and push
  4. commit in a new branch and create a pull request (push to upstream)

The most important scenarios are (1) and (3). (4) is also good. And it is not clear if we need (3).

The current code implements (3). @Suor could you please incorporate (1) and (3) in this PR? Or it is better to do that in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This https://github.com/iterative/dvcx/pull/7 should handle 1 and 3. 3 works with any existing branch. Not sure how 4 should be implemented, can you create an issue in dvcx for that?

if not os.path.exists(self._path):
self.objects = self.SCHEMA({})["dvc-objects"]

@wrap_with(threading.Lock())
def _invoke_method(call, args, path):
# XXX: Some issues with this approach:
# * Import will pollute sys.modules
# * sys.path manipulation is "theoretically" not needed,
# but tests are failing for an unknown reason.
cwd = os.getcwd()

try:
os.chdir(path)
sys.path.insert(0, path)
method = _import_string(call)
return method(**args)
finally:
os.chdir(cwd)
sys.path.pop(0)
if name in self.objects and not overwrite:
raise SummonError(
"There is an existing summonable object named '{}' in '{}:{}'."
" Use SummonFile.set(..., overwrite=True) to"
" overwrite it.".format(name, self.repo.url, self.filename)
)

self.objects[name] = dobj
self._write_yaml(self.objects)

def _import_string(import_name):
"""Imports an object based on a string.
Useful to delay import to not load everything on startup.
Use dotted notaion in `import_name`, e.g. 'dvc.remote.gs.RemoteGS'.
# Add deps and push to remote
deps = dobj["summon"].get("deps", [])
stages = []
if deps:
stages = self.repo.add(
lmap(self.abs, deps), fname=self.abs(name + ".dvc")
)
self.repo.push()

:return: imported object
"""
if "." in import_name:
module, obj = import_name.rsplit(".", 1)
else:
return importlib.import_module(import_name)
return getattr(importlib.import_module(module), obj)
# Create commit and push
self.repo.scm.add([self._path] + [stage.path for stage in stages])
self.repo.scm.commit("Add {} to {}".format(name, self.filename))
self.repo.scm.push()


def _require_dvc(repo):
Expand Down
3 changes: 3 additions & 0 deletions dvc/scm/git/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def checkout(self, branch, create_new=False):
else:
self.repo.git.checkout(branch)

def push(self):
self.repo.remote().push()

def branch(self, branch):
self.repo.git.branch(branch)

Expand Down
25 changes: 8 additions & 17 deletions tests/func/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from dvc import api
from dvc.api import SummonError, UrlNotDvcRepoError
from dvc.api import SummonFile, SummonError, UrlNotDvcRepoError
from dvc.compat import fspath
from dvc.exceptions import FileMissingError
from dvc.main import main
Expand Down Expand Up @@ -145,9 +145,8 @@ def test_open_not_cached(dvc):

def test_summon(tmp_dir, dvc, erepo_dir):
objects = {
"objects": [
{
"name": "sum",
SummonFile.DOBJ_SECTION: {
"sum": {
"meta": {"description": "Add <x> to <number>"},
"summon": {
"type": "python",
Expand All @@ -156,20 +155,16 @@ def test_summon(tmp_dir, dvc, erepo_dir):
"deps": ["number"],
},
}
]
}
}

other_objects = copy.deepcopy(objects)
other_objects["objects"][0]["summon"]["args"]["x"] = 100

dup_objects = copy.deepcopy(objects)
dup_objects["objects"] *= 2
other_objects[SummonFile.DOBJ_SECTION]["sum"]["summon"]["args"]["x"] = 100

with erepo_dir.chdir():
erepo_dir.dvc_gen("number", "100", commit="Add number.dvc")
erepo_dir.scm_gen("dvcsummon.yaml", ruamel.yaml.dump(objects))
erepo_dir.scm_gen(SummonFile.DEF_NAME, ruamel.yaml.dump(objects))
erepo_dir.scm_gen("other.yaml", ruamel.yaml.dump(other_objects))
erepo_dir.scm_gen("dup.yaml", ruamel.yaml.dump(dup_objects))
erepo_dir.scm_gen("invalid.yaml", ruamel.yaml.dump({"name": "sum"}))
erepo_dir.scm_gen("not_yaml.yaml", "a: - this is not a YAML file")
erepo_dir.scm_gen(
Expand All @@ -189,18 +184,14 @@ def test_summon(tmp_dir, dvc, erepo_dir):
except SummonError as exc:
assert "Summon file not found" in str(exc)
assert "missing.yaml" in str(exc)
assert repo_url in str(exc)
# Fails
# assert repo_url in str(exc)
else:
pytest.fail("Did not raise on missing summon file")

with pytest.raises(SummonError, match=r"No object with name 'missing'"):
api.summon("missing", repo=repo_url)

with pytest.raises(
SummonError, match=r"More than one object with name 'sum'"
):
api.summon("sum", repo=repo_url, summon_file="dup.yaml")

with pytest.raises(SummonError, match=r"extra keys not allowed"):
api.summon("sum", repo=repo_url, summon_file="invalid.yaml")

Expand Down