Skip to content

Commit

Permalink
introduce index (#6300)
Browse files Browse the repository at this point in the history
* introduce index

* migrate tests to use index

* migrate checkout/diff to use index.outs

* test for index

* fix typings

* add few more tests
  • Loading branch information
skshetry authored Aug 5, 2021
1 parent 3be395b commit 6c33fbb
Show file tree
Hide file tree
Showing 30 changed files with 701 additions and 208 deletions.
10 changes: 6 additions & 4 deletions dvc/command/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,28 @@ def _collect_targets(repo, target, outs):
return [stage.addressing for stage, _ in pairs]

targets = []

outs_trie = repo.index.outs_trie
for stage, info in pairs:
if not info:
targets.extend([str(out) for out in stage.outs])
continue

for out in repo.outs_trie.itervalues(prefix=info.parts): # noqa: B301
for out in outs_trie.itervalues(prefix=info.parts): # noqa: B301
targets.extend(str(out))

return targets


def _transform(repo, outs):
def _transform(index, outs):
import networkx as nx

from dvc.stage import Stage

def _relabel(node) -> str:
return node.addressing if isinstance(node, Stage) else str(node)

G = repo.outs_graph if outs else repo.graph
G = index.outs_graph if outs else index.graph
return nx.relabel_nodes(G, _relabel, copy=True)


Expand Down Expand Up @@ -85,7 +87,7 @@ def _filter(G, targets, full):

def _build(repo, target=None, full=False, outs=False):
targets = _collect_targets(repo, target, outs)
G = _transform(repo, outs)
G = _transform(repo.index, outs)
return _filter(G, targets, full)


Expand Down
2 changes: 1 addition & 1 deletion dvc/command/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def prepare_stages_data(
class CmdStageList(CmdBase):
def _get_stages(self) -> Iterable["Stage"]:
if self.args.all:
stages: List["Stage"] = self.repo.stages # type: ignore
stages: List["Stage"] = self.repo.index.stages # type: ignore
logger.trace( # type: ignore[attr-defined]
"%d no. of stages found", len(stages)
)
Expand Down
2 changes: 1 addition & 1 deletion dvc/command/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run(self):
return 0

# additional hints for the user
if not self.repo.stages:
if not self.repo.index.stages:
ui.write(self.EMPTY_PROJECT_MSG)
elif self.args.cloud or self.args.remote:
remote = self.args.remote or self.repo.config["core"].get(
Expand Down
10 changes: 9 additions & 1 deletion dvc/objects/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Iterator, Union

from .tree import Tree

Expand All @@ -24,3 +24,11 @@ def load(odb: "ObjectDB", hash_info: "HashInfo") -> "HashFile":
if hash_info.isdir:
return Tree.load(odb, hash_info)
return odb.get(hash_info)


def iterobjs(
obj: Union["Tree", "HashFile"]
) -> Iterator[Union["Tree", "HashFile"]]:
if isinstance(obj, Tree):
yield from (entry_obj for _, entry_obj in obj)
yield obj
12 changes: 12 additions & 0 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,18 @@ def merge(self, ancestor, other):
def fspath(self):
return self.path_info.fspath

@property
def is_decorated(self) -> bool:
return self.is_metric or self.is_plot

@property
def is_metric(self) -> bool:
return bool(self.metric) or bool(self.live)

@property
def is_plot(self) -> bool:
return bool(self.plot)


ARTIFACT_SCHEMA = {
**CHECKSUMS_SCHEMA,
Expand Down
118 changes: 34 additions & 84 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from collections import defaultdict
from contextlib import contextmanager
from functools import wraps
from typing import TYPE_CHECKING, Callable, Optional
from typing import TYPE_CHECKING, Callable, Optional, Set

from funcy import cached_property, cat
from funcy import cached_property

from dvc.exceptions import FileMissingError
from dvc.exceptions import IsADirectoryError as DvcIsADirectoryError
Expand All @@ -14,11 +14,9 @@
from dvc.path_info import PathInfo
from dvc.utils.fs import path_isin

from .graph import build_graph, build_outs_graph, get_pipelines
from .trie import build_outs_trie

if TYPE_CHECKING:
from dvc.fs.base import BaseFileSystem
from dvc.objects.file import HashFile
from dvc.scm import Base


Expand Down Expand Up @@ -57,7 +55,6 @@ class Repo:
DVC_DIR = ".dvc"

from dvc.repo.add import add
from dvc.repo.brancher import brancher
from dvc.repo.checkout import checkout
from dvc.repo.commit import commit
from dvc.repo.destroy import destroy
Expand Down Expand Up @@ -207,6 +204,12 @@ def __init__(
def __str__(self):
return self.url or self.root_dir

@cached_property
def index(self):
from dvc.repo.index import Index

return Index(self)

@staticmethod
def open(url, *args, **kwargs):
if url is None:
Expand Down Expand Up @@ -323,23 +326,10 @@ def _ignore(self):

self.scm.ignore_list(flist)

def check_modified_graph(self, new_stages, old_stages=None):
"""Generate graph including the new stage to check for errors"""
# Building graph might be costly for the ones with many DVC-files,
# so we provide this undocumented hack to skip it. See [1] for
# more details. The hack can be used as:
#
# repo = Repo(...)
# repo._skip_graph_checks = True
# repo.add(...)
#
# A user should care about not duplicating outs and not adding cycles,
# otherwise DVC might have an undefined behaviour.
#
# [1] https://github.com/iterative/dvc/issues/2671
if not getattr(self, "_skip_graph_checks", False):
existing_stages = self.stages if old_stages is None else old_stages
build_graph(existing_stages + new_stages)
def brancher(self, *args, **kwargs):
from dvc.repo.brancher import brancher

return brancher(self, *args, **kwargs)

def used_objs(
self,
Expand Down Expand Up @@ -373,16 +363,14 @@ def used_objs(
"""
used = defaultdict(set)

def _add_suffix(objs, suffix):
from dvc.objects.tree import Tree
def _add_suffix(objs: Set["HashFile"], suffix: str) -> None:
from itertools import chain

from dvc.objects import iterobjs

for obj in objs:
for obj in chain.from_iterable(map(iterobjs, objs)):
if obj.name is not None:
obj.name += suffix
if isinstance(obj, Tree):
for _, entry_obj in obj:
if entry_obj.name is not None:
entry_obj.name += suffix

for branch in self.brancher(
revs=revs,
Expand All @@ -391,25 +379,17 @@ def _add_suffix(objs, suffix):
all_commits=all_commits,
all_experiments=all_experiments,
):
targets = targets or [None]

pairs = cat(
self.stage.collect_granular(
target, recursive=recursive, with_deps=with_deps
)
for target in targets
)

for stage, filter_info in pairs:
for odb, objs in stage.get_used_objs(
remote=remote,
force=force,
jobs=jobs,
filter_info=filter_info,
).items():
if branch:
_add_suffix(objs, f" ({branch})")
used[odb].update(objs)
for odb, objs in self.index.used_objs(
targets,
remote=remote,
force=force,
jobs=jobs,
recursive=recursive,
with_deps=with_deps,
).items():
if branch:
_add_suffix(objs, f" ({branch})")
used[odb].update(objs)

if used_run_cache:
for odb, objs in self.stage_cache.get_used_objs(
Expand All @@ -419,39 +399,13 @@ def _add_suffix(objs, suffix):

return used

@cached_property
def outs_trie(self):
return build_outs_trie(self.stages)

@cached_property
def graph(self):
return build_graph(self.stages, self.outs_trie)

@cached_property
def outs_graph(self):
return build_outs_graph(self.graph, self.outs_trie)

@cached_property
def pipelines(self):
return get_pipelines(self.graph)

@cached_property
def stages(self):
"""
Walks down the root directory looking for Dvcfiles,
skipping the directories that are related with
any SCM (e.g. `.git`), DVC itself (`.dvc`), or directories
tracked by DVC (e.g. `dvc add data` would skip `data/`)
NOTE: For large repos, this could be an expensive
operation. Consider using some memoization.
"""
error_handler = self.stage_collection_error_handler
return self.stage.collect_repo(onerror=error_handler)
@property
def stages(self): # obsolete, only for backward-compatibility
return self.index.stages

def find_outs_by_path(self, path, outs=None, recursive=False, strict=True):
# using `outs_graph` to ensure graph checks are run
outs = outs or self.outs_graph
outs = outs or self.index.outs_graph

abs_path = os.path.abspath(path)
path_info = PathInfo(abs_path)
Expand Down Expand Up @@ -512,11 +466,7 @@ def close(self):
def _reset(self):
self.state.close()
self.scm._reset() # pylint: disable=protected-access
self.__dict__.pop("outs_trie", None)
self.__dict__.pop("outs_graph", None)
self.__dict__.pop("graph", None)
self.__dict__.pop("stages", None)
self.__dict__.pop("pipelines", None)
self.__dict__.pop("index", None)
self.__dict__.pop("dvcignore", None)

def __enter__(self):
Expand Down
5 changes: 2 additions & 3 deletions dvc/repo/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,13 @@ def add( # noqa: C901
desc = "Collecting targets"
stages_it = create_stages(repo, add_targets, fname, transfer, **kwargs)
stages = list(ui.progress(stages_it, desc=desc, unit="file"))

msg = "Collecting stages from the workspace"
with translate_graph_error(stages), ui.status(msg) as status:
# remove existing stages that are to-be replaced with these
# new stages for the graph checks.
old_stages = set(repo.stages) - set(stages)
new_index = repo.index.update(stages)
status.update("Checking graph")
repo.check_modified_graph(stages, list(old_stages))
new_index.check_graph()

odb = None
if to_remote:
Expand Down
8 changes: 1 addition & 7 deletions dvc/repo/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ def _fspath_dir(path):


def _remove_unused_links(repo):
used = [
out.fspath
for stage in repo.stages
for out in stage.outs
if out.scheme == "local"
]

used = [out.fspath for out in repo.index.outs if out.scheme == "local"]
unused = repo.state.get_unused_links(used, repo.fs)
ret = [_fspath_dir(u) for u in unused]
repo.state.remove_links(unused, repo.fs)
Expand Down
9 changes: 3 additions & 6 deletions dvc/repo/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@
def _collect_outs(
repo: "Repo", output_filter: FilterFn = None, deps: bool = False
) -> Outputs:
outs = [
out
for stage in repo.graph # using `graph` to ensure graph checks run
for out in (stage.deps if deps else stage.outs)
]
return list(filter(output_filter, outs)) if output_filter else outs
index = repo.index
index.check_graph() # ensure graph is correct
return list(filter(output_filter, index.deps if deps else index.outs))


def _collect_paths(
Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/destroy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

@locked
def _destroy_stages(repo):
for stage in repo.stages:
for stage in repo.index.stages:
stage.unprotect_outs()
stage.dvcfile.remove(force=True)

Expand Down
29 changes: 14 additions & 15 deletions dvc/repo/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,20 @@ def _to_checksum(output):
)[1].hash_info.value
return output.hash_info.value

for stage in repo.stages:
for output in stage.outs:
if _exists(output):
yield_output = targets is None or any(
output.path_info.isin_or_eq(target) for target in targets
)

if yield_output:
yield _to_path(output), _to_checksum(output)

if output.is_dir_checksum and (
yield_output
or any(target.isin(output.path_info) for target in targets)
):
yield from _dir_output_paths(repo_fs, output, targets)
for output in repo.index.outs:
if _exists(output):
yield_output = targets is None or any(
output.path_info.isin_or_eq(target) for target in targets
)

if yield_output:
yield _to_path(output), _to_checksum(output)

if output.is_dir_checksum and (
yield_output
or any(target.isin(output.path_info) for target in targets)
):
yield from _dir_output_paths(repo_fs, output, targets)


def _dir_output_paths(repo_fs, output, targets=None):
Expand Down
3 changes: 2 additions & 1 deletion dvc/repo/imp_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def imp_url(
dvcfile.remove()

try:
self.check_modified_graph([stage])
new_index = self.index.add(stage)
new_index.check_graph()
except OutputDuplicationError as exc:
raise OutputDuplicationError(exc.output, set(exc.stages) - {stage})

Expand Down
Loading

0 comments on commit 6c33fbb

Please sign in to comment.