Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce index #6300

Merged
merged 6 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dvc/command/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,28 @@ def _collect_targets(repo, target, outs):
return [stage.addressing for stage, _ in pairs]

targets = []

outs_trie = repo.index.outs_trie
for stage, info in pairs:
if not info:
targets.extend([str(out) for out in stage.outs])
continue

for out in repo.outs_trie.itervalues(prefix=info.parts): # noqa: B301
for out in outs_trie.itervalues(prefix=info.parts): # noqa: B301
targets.extend(str(out))

return targets


def _transform(repo, outs):
def _transform(index, outs):
import networkx as nx

from dvc.stage import Stage

def _relabel(node) -> str:
return node.addressing if isinstance(node, Stage) else str(node)

G = repo.outs_graph if outs else repo.graph
G = index.outs_graph if outs else index.graph
return nx.relabel_nodes(G, _relabel, copy=True)


Expand Down Expand Up @@ -85,7 +87,7 @@ def _filter(G, targets, full):

def _build(repo, target=None, full=False, outs=False):
targets = _collect_targets(repo, target, outs)
G = _transform(repo, outs)
G = _transform(repo.index, outs)
return _filter(G, targets, full)


Expand Down
2 changes: 1 addition & 1 deletion dvc/command/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def prepare_stages_data(
class CmdStageList(CmdBase):
def _get_stages(self) -> Iterable["Stage"]:
if self.args.all:
stages: List["Stage"] = self.repo.stages # type: ignore
stages: List["Stage"] = self.repo.index.stages # type: ignore
logger.trace( # type: ignore[attr-defined]
"%d no. of stages found", len(stages)
)
Expand Down
2 changes: 1 addition & 1 deletion dvc/command/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run(self):
return 0

# additional hints for the user
if not self.repo.stages:
if not self.repo.index.stages:
ui.write(self.EMPTY_PROJECT_MSG)
elif self.args.cloud or self.args.remote:
remote = self.args.remote or self.repo.config["core"].get(
Expand Down
10 changes: 9 additions & 1 deletion dvc/objects/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Iterator, Union

from .tree import Tree

Expand All @@ -24,3 +24,11 @@ def load(odb: "ObjectDB", hash_info: "HashInfo") -> "HashFile":
if hash_info.isdir:
return Tree.load(odb, hash_info)
return odb.get(hash_info)


def iterobjs(
obj: Union["Tree", "HashFile"]
) -> Iterator[Union["Tree", "HashFile"]]:
if isinstance(obj, Tree):
yield from (entry_obj for _, entry_obj in obj)
yield obj
12 changes: 12 additions & 0 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,18 @@ def merge(self, ancestor, other):
def fspath(self):
return self.path_info.fspath

@property
def is_decorated(self) -> bool:
return self.is_metric or self.is_plot
skshetry marked this conversation as resolved.
Show resolved Hide resolved

@property
def is_metric(self) -> bool:
return bool(self.metric) or bool(self.live)

@property
def is_plot(self) -> bool:
return bool(self.plot)


ARTIFACT_SCHEMA = {
**CHECKSUMS_SCHEMA,
Expand Down
118 changes: 34 additions & 84 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from collections import defaultdict
from contextlib import contextmanager
from functools import wraps
from typing import TYPE_CHECKING, Callable, Optional
from typing import TYPE_CHECKING, Callable, Optional, Set

from funcy import cached_property, cat
from funcy import cached_property

from dvc.exceptions import FileMissingError
from dvc.exceptions import IsADirectoryError as DvcIsADirectoryError
Expand All @@ -14,11 +14,9 @@
from dvc.path_info import PathInfo
from dvc.utils.fs import path_isin

from .graph import build_graph, build_outs_graph, get_pipelines
from .trie import build_outs_trie

if TYPE_CHECKING:
from dvc.fs.base import BaseFileSystem
from dvc.objects.file import HashFile
from dvc.scm import Base


Expand Down Expand Up @@ -57,7 +55,6 @@ class Repo:
DVC_DIR = ".dvc"

from dvc.repo.add import add
from dvc.repo.brancher import brancher
from dvc.repo.checkout import checkout
from dvc.repo.commit import commit
from dvc.repo.destroy import destroy
Expand Down Expand Up @@ -207,6 +204,12 @@ def __init__(
def __str__(self):
return self.url or self.root_dir

@cached_property
def index(self):
from dvc.repo.index import Index

return Index(self)

@staticmethod
def open(url, *args, **kwargs):
if url is None:
Expand Down Expand Up @@ -323,23 +326,10 @@ def _ignore(self):

self.scm.ignore_list(flist)

def check_modified_graph(self, new_stages, old_stages=None):
"""Generate graph including the new stage to check for errors"""
# Building graph might be costly for the ones with many DVC-files,
# so we provide this undocumented hack to skip it. See [1] for
# more details. The hack can be used as:
#
# repo = Repo(...)
# repo._skip_graph_checks = True
# repo.add(...)
#
# A user should care about not duplicating outs and not adding cycles,
# otherwise DVC might have an undefined behaviour.
#
# [1] https://github.com/iterative/dvc/issues/2671
if not getattr(self, "_skip_graph_checks", False):
existing_stages = self.stages if old_stages is None else old_stages
build_graph(existing_stages + new_stages)
def brancher(self, *args, **kwargs):
from dvc.repo.brancher import brancher

return brancher(self, *args, **kwargs)

def used_objs(
self,
Expand Down Expand Up @@ -373,16 +363,14 @@ def used_objs(
"""
used = defaultdict(set)

def _add_suffix(objs, suffix):
from dvc.objects.tree import Tree
def _add_suffix(objs: Set["HashFile"], suffix: str) -> None:
from itertools import chain

from dvc.objects import iterobjs

for obj in objs:
for obj in chain.from_iterable(map(iterobjs, objs)):
if obj.name is not None:
obj.name += suffix
if isinstance(obj, Tree):
for _, entry_obj in obj:
if entry_obj.name is not None:
entry_obj.name += suffix

for branch in self.brancher(
revs=revs,
Expand All @@ -391,25 +379,17 @@ def _add_suffix(objs, suffix):
all_commits=all_commits,
all_experiments=all_experiments,
):
targets = targets or [None]

pairs = cat(
self.stage.collect_granular(
target, recursive=recursive, with_deps=with_deps
)
for target in targets
)

for stage, filter_info in pairs:
for odb, objs in stage.get_used_objs(
remote=remote,
force=force,
jobs=jobs,
filter_info=filter_info,
).items():
if branch:
_add_suffix(objs, f" ({branch})")
used[odb].update(objs)
for odb, objs in self.index.used_objs(
targets,
remote=remote,
force=force,
jobs=jobs,
recursive=recursive,
with_deps=with_deps,
).items():
if branch:
_add_suffix(objs, f" ({branch})")
used[odb].update(objs)

if used_run_cache:
for odb, objs in self.stage_cache.get_used_objs(
Expand All @@ -419,39 +399,13 @@ def _add_suffix(objs, suffix):

return used

@cached_property
def outs_trie(self):
return build_outs_trie(self.stages)

@cached_property
def graph(self):
return build_graph(self.stages, self.outs_trie)

@cached_property
def outs_graph(self):
return build_outs_graph(self.graph, self.outs_trie)

@cached_property
def pipelines(self):
return get_pipelines(self.graph)

@cached_property
def stages(self):
"""
Walks down the root directory looking for Dvcfiles,
skipping the directories that are related with
any SCM (e.g. `.git`), DVC itself (`.dvc`), or directories
tracked by DVC (e.g. `dvc add data` would skip `data/`)

NOTE: For large repos, this could be an expensive
operation. Consider using some memoization.
"""
error_handler = self.stage_collection_error_handler
return self.stage.collect_repo(onerror=error_handler)
@property
def stages(self): # obsolete, only for backward-compatibility
return self.index.stages

def find_outs_by_path(self, path, outs=None, recursive=False, strict=True):
# using `outs_graph` to ensure graph checks are run
outs = outs or self.outs_graph
outs = outs or self.index.outs_graph

abs_path = os.path.abspath(path)
path_info = PathInfo(abs_path)
Expand Down Expand Up @@ -512,11 +466,7 @@ def close(self):
def _reset(self):
self.state.close()
self.scm._reset() # pylint: disable=protected-access
self.__dict__.pop("outs_trie", None)
self.__dict__.pop("outs_graph", None)
self.__dict__.pop("graph", None)
self.__dict__.pop("stages", None)
self.__dict__.pop("pipelines", None)
skshetry marked this conversation as resolved.
Show resolved Hide resolved
self.__dict__.pop("index", None)
self.__dict__.pop("dvcignore", None)

def __enter__(self):
Expand Down
5 changes: 2 additions & 3 deletions dvc/repo/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,13 @@ def add( # noqa: C901
desc = "Collecting targets"
stages_it = create_stages(repo, add_targets, fname, transfer, **kwargs)
stages = list(ui.progress(stages_it, desc=desc, unit="file"))

msg = "Collecting stages from the workspace"
with translate_graph_error(stages), ui.status(msg) as status:
# remove existing stages that are to-be replaced with these
# new stages for the graph checks.
old_stages = set(repo.stages) - set(stages)
new_index = repo.index.update(stages)
status.update("Checking graph")
repo.check_modified_graph(stages, list(old_stages))
new_index.check_graph()

odb = None
if to_remote:
Expand Down
8 changes: 1 addition & 7 deletions dvc/repo/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ def _fspath_dir(path):


def _remove_unused_links(repo):
used = [
out.fspath
for stage in repo.stages
for out in stage.outs
if out.scheme == "local"
]

used = [out.fspath for out in repo.index.outs if out.scheme == "local"]
unused = repo.state.get_unused_links(used, repo.fs)
ret = [_fspath_dir(u) for u in unused]
repo.state.remove_links(unused, repo.fs)
Expand Down
9 changes: 3 additions & 6 deletions dvc/repo/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@
def _collect_outs(
repo: "Repo", output_filter: FilterFn = None, deps: bool = False
) -> Outputs:
outs = [
out
for stage in repo.graph # using `graph` to ensure graph checks run
for out in (stage.deps if deps else stage.outs)
]
return list(filter(output_filter, outs)) if output_filter else outs
index = repo.index
index.check_graph() # ensure graph is correct
return list(filter(output_filter, index.deps if deps else index.outs))


def _collect_paths(
Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/destroy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

@locked
def _destroy_stages(repo):
for stage in repo.stages:
for stage in repo.index.stages:
stage.unprotect_outs()
stage.dvcfile.remove(force=True)

Expand Down
29 changes: 14 additions & 15 deletions dvc/repo/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,20 @@ def _to_checksum(output):
)[1].hash_info.value
return output.hash_info.value

for stage in repo.stages:
for output in stage.outs:
if _exists(output):
yield_output = targets is None or any(
output.path_info.isin_or_eq(target) for target in targets
)

if yield_output:
yield _to_path(output), _to_checksum(output)

if output.is_dir_checksum and (
yield_output
or any(target.isin(output.path_info) for target in targets)
):
yield from _dir_output_paths(repo_fs, output, targets)
for output in repo.index.outs:
if _exists(output):
yield_output = targets is None or any(
output.path_info.isin_or_eq(target) for target in targets
)

if yield_output:
yield _to_path(output), _to_checksum(output)

if output.is_dir_checksum and (
yield_output
or any(target.isin(output.path_info) for target in targets)
):
yield from _dir_output_paths(repo_fs, output, targets)


def _dir_output_paths(repo_fs, output, targets=None):
Expand Down
3 changes: 2 additions & 1 deletion dvc/repo/imp_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def imp_url(
dvcfile.remove()

try:
self.check_modified_graph([stage])
new_index = self.index.add(stage)
new_index.check_graph()
except OutputDuplicationError as exc:
raise OutputDuplicationError(exc.output, set(exc.stages) - {stage})

Expand Down
Loading