From 6a34b11ce1dec396377abe994c6f7839346241ae Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Thu, 27 Jan 2022 17:51:46 +0200 Subject: [PATCH] dvcfs: detach from pipeline outputs Introducing simple repo tree that combines all workspaces into one structure, and allows us to detach data representation from pipelines. Pre-requisite for dvc-data extraction and using relpaths in dvcfs/repofs. --- dvc/data/meta.py | 10 +- dvc/data/stage.py | 2 +- dvc/data/tree.py | 49 +++++++- dvc/fs/dvc.py | 180 ++++++++++------------------ dvc/fs/repo.py | 13 +- dvc/repo/diff.py | 10 +- dvc/repo/index.py | 30 +++++ tests/func/objects/db/test_index.py | 4 +- tests/func/test_ls.py | 18 +-- tests/func/test_remote.py | 2 +- tests/unit/fs/test_repo.py | 2 +- tests/unit/fs/test_repo_info.py | 47 ++------ 12 files changed, 189 insertions(+), 178 deletions(-) diff --git a/dvc/data/meta.py b/dvc/data/meta.py index 0284286a6c..8d21f1725d 100644 --- a/dvc/data/meta.py +++ b/dvc/data/meta.py @@ -1,6 +1,10 @@ from collections import OrderedDict from dataclasses import dataclass, field -from typing import Optional +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from dvc.objects.db import ObjectDB + from dvc.objects.file import HashFile @dataclass @@ -13,6 +17,10 @@ class Meta: nfiles: Optional[int] = field(default=None) isexec: Optional[bool] = field(default=False) + obj: Optional["HashFile"] = field(default=None) + odb: Optional["ObjectDB"] = field(default=None) + remote: Optional[str] = field(default=None) + @classmethod def from_dict(cls, d): if not d: diff --git a/dvc/data/stage.py b/dvc/data/stage.py index 992ad84379..4ab026dd5e 100644 --- a/dvc/data/stage.py +++ b/dvc/data/stage.py @@ -168,7 +168,7 @@ def _build_tree(fs_path, fs, name, **kwargs): assert key tree.add(key, meta, obj.hash_info) - tree_meta.size += meta.size + tree_meta.size += meta.size or 0 tree_meta.nfiles += 1 return tree_meta, tree diff --git a/dvc/data/tree.py b/dvc/data/tree.py index 02114a5e6f..25e29d8121 100644 --- a/dvc/data/tree.py +++ b/dvc/data/tree.py @@ -1,7 +1,7 @@ import json import logging import posixpath -from typing import TYPE_CHECKING, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, Optional, Tuple from funcy import cached_property @@ -12,12 +12,29 @@ if TYPE_CHECKING: from dvc.hash_info import HashInfo + from dvc.objects.db import ObjectDB from .meta import Meta logger = logging.getLogger(__name__) +def _try_load( + odbs: Iterable["ObjectDB"], + hash_info: "HashInfo", +) -> Optional["HashFile"]: + for odb in odbs: + if not odb: + continue + + try: + return Tree.load(odb, hash_info) + except (FileNotFoundError, ObjectFormatError): + pass + + return None + + class Tree(HashFile): PARAM_RELPATH = "relpath" @@ -26,7 +43,7 @@ def __init__(self, *args, **kwargs): self._dict: Dict[Tuple[str], Tuple["Meta", "HashInfo"]] = {} @cached_property - def trie(self): + def _trie(self): from pygtrie import Trie return Trie(self._dict) @@ -52,6 +69,30 @@ def digest(self, hash_info: Optional["HashInfo"] = None): assert self.hash_info.value self.hash_info.value += ".dir" + def _load(self, key, meta, hash_info): + if hash_info and hash_info.isdir and not meta.obj: + meta.obj = _try_load([meta.odb, meta.remote], hash_info) + if meta.obj: + for ikey, value in meta.obj.iteritems(): + self._trie[key + ikey] = value + self._dict[key + ikey] = value + + def iteritems(self, prefix=None): + kwargs = {} + if prefix: + kwargs = {"prefix": prefix} + item = self._trie.longest_prefix(prefix) + if item: + key, (meta, hash_info) = item + self._load(key, meta, hash_info) + + for key, (meta, hash_info) in self._trie.iteritems(**kwargs): + self._load(key, meta, hash_info) + yield key, (meta, hash_info) + + def shortest_prefix(self, *args, **kwargs): + return self._trie.shortest_prefix(*args, **kwargs) + def __len__(self): return len(self._dict) @@ -131,7 +172,7 @@ def filter(self, prefix: Tuple[str]) -> Optional["Tree"]: """ tree = Tree(self.fs_path, self.fs, self.hash_info) try: - for key, (meta, oid) in self.trie.items(prefix): + for key, (meta, oid) in self._trie.items(prefix): tree.add(key, meta, oid) except KeyError: pass @@ -149,7 +190,7 @@ def get(self, odb, prefix: Tuple[str]) -> Optional[HashFile]: tree = Tree(None, None, None) depth = len(prefix) try: - for key, (meta, entry_oid) in self.trie.items(prefix): + for key, (meta, entry_oid) in self._trie.items(prefix): tree.add(key[depth:], meta, entry_oid) except KeyError: return None diff --git a/dvc/fs/dvc.py b/dvc/fs/dvc.py index 2219a98c34..f0eacf5ba1 100644 --- a/dvc/fs/dvc.py +++ b/dvc/fs/dvc.py @@ -2,14 +2,10 @@ import os import typing -from dvc.exceptions import OutputNotFoundError -from dvc.utils import relpath - from ._callback import DEFAULT_CALLBACK from .base import FileSystem if typing.TYPE_CHECKING: - from dvc.output import Output from dvc.types import AnyPath logger = logging.getLogger(__name__) @@ -35,67 +31,44 @@ def __init__(self, **kwargs): def config(self): raise NotImplementedError - def _find_outs(self, path, *args, **kwargs): - outs = self.repo.find_outs_by_path(path, *args, **kwargs) + def _get_key(self, path): + from . import get_cloud_fs - def _is_cached(out): - return out.use_cache + cls, kwargs, fs_path = get_cloud_fs(None, url=path) - outs = list(filter(_is_cached, outs)) - if not outs: - raise OutputNotFoundError(path, self.repo) + if not path.startswith(self.repo.root_dir): + fs = cls(**kwargs) + return (cls.scheme, *fs.path.parts(fs_path)) - return outs + fs_key = "repo" + key = self.path.relparts(path, self.repo.root_dir) + if key == (".",): + key = () - def _get_granular_hash(self, path: "AnyPath", out: "Output", remote=None): - # NOTE: use string paths here for performance reasons - key = tuple(relpath(path, out.fs_path).split(os.sep)) - out.get_dir_cache(remote=remote) - if out.obj is None: - raise FileNotFoundError - (_, oid) = out.obj.trie.get(key) or (None, None) - if oid: - return oid - raise FileNotFoundError + return (fs_key, *key) def _get_fs_path(self, path: "AnyPath", remote=None): - try: - outs = self._find_outs(path, strict=False) - except OutputNotFoundError as exc: - raise FileNotFoundError from exc + from dvc.config import NoRemoteError - if len(outs) != 1 or ( - outs[0].is_dir_checksum and path == outs[0].fs_path - ): + info = self.info(path) + if info["type"] == "directory": raise IsADirectoryError - out = outs[0] - - if not out.hash_info: + value = info.get("md5") + if not value: raise FileNotFoundError - if out.changed_cache(filter_info=path): - from dvc.config import NoRemoteError - - try: - remote_odb = self.repo.cloud.get_remote_odb(remote) - except NoRemoteError as exc: - raise FileNotFoundError from exc - if out.is_dir_checksum: - checksum = self._get_granular_hash(path, out).value - else: - checksum = out.hash_info.value - remote_fs_path = remote_odb.hash_to_path(checksum) - return remote_odb.fs, remote_fs_path - - if out.is_dir_checksum: - checksum = self._get_granular_hash(path, out).value - cache_path = out.odb.fs.unstrip_protocol( - out.odb.hash_to_path(checksum) - ) - else: - cache_path = out.cache_path - return out.odb.fs, cache_path + cache_path = self.repo.odb.local.hash_to_path(value) + + if self.repo.odb.local.fs.exists(cache_path): + return self.repo.odb.local.fs, cache_path + + try: + remote_odb = self.repo.cloud.get_remote_odb(remote) + except NoRemoteError as exc: + raise FileNotFoundError from exc + remote_fs_path = remote_odb.hash_to_path(value) + return remote_odb.fs, remote_fs_path def open( # type: ignore self, path: str, mode="r", encoding=None, **kwargs @@ -122,37 +95,27 @@ def isfile(self, path): # pylint: disable=arguments-renamed except FileNotFoundError: return False - def _fetch_dir(self, out, **kwargs): - # pull dir cache if needed - out.get_dir_cache(**kwargs) - - if not out.obj: - raise FileNotFoundError - - def _add_dir(self, trie, out, **kwargs): - self._fetch_dir(out, **kwargs) - - base = out.fs.path.parts(out.fs_path) - for key, _, _ in out.obj: # noqa: B301 - trie[base + key] = None - - def _walk(self, root, trie, topdown=True, **kwargs): + def _walk(self, root, topdown=True, **kwargs): dirs = set() files = [] - root_parts = self.path.parts(root) - out = trie.get(root_parts) - if out and out.is_dir_checksum: - self._add_dir(trie, out, **kwargs) - + root_parts = self._get_key(root) root_len = len(root_parts) try: - for key, out in trie.iteritems(prefix=root_parts): # noqa: B301 + for key, (meta, hash_info) in self.repo.index.tree.iteritems( + prefix=root_parts + ): # noqa: B301 + if hash_info and hash_info.isdir and meta and not meta.obj: + raise FileNotFoundError + if key == root_parts: continue + if hash_info.isdir: + continue + name = key[root_len] - if len(key) > root_len + 1 or (out and out.is_dir_checksum): + if len(key) > root_len + 1: dirs.add(name) continue @@ -165,11 +128,9 @@ def _walk(self, root, trie, topdown=True, **kwargs): yield root, dirs, files for dname in dirs: - yield from self._walk(self.path.join(root, dname), trie) + yield from self._walk(self.path.join(root, dname)) def walk(self, top, topdown=True, onerror=None, **kwargs): - from pygtrie import Trie - assert topdown root = os.path.abspath(top) try: @@ -184,14 +145,7 @@ def walk(self, top, topdown=True, onerror=None, **kwargs): onerror(NotADirectoryError(top)) return - trie = Trie() - for out in info["outs"]: - trie[out.fs.path.parts(out.fs_path)] = out - - if out.is_dir_checksum and self.path.isin_or_eq(root, out.fs_path): - self._add_dir(trie, out, **kwargs) - - yield from self._walk(root, trie, topdown=topdown, **kwargs) + yield from self._walk(root, topdown=topdown, **kwargs) def find(self, path, prefix=None): for root, _, files in self.walk(path): @@ -209,56 +163,52 @@ def isdvc(self, path, recursive=False, strict=True): return bool(info.get("outs") if recurse else info.get("isout")) def info(self, path): + from dvc.data.meta import Meta + abspath = os.path.abspath(path) + key = self._get_key(abspath) + try: - outs = self._find_outs(abspath, strict=False, recursive=True) - except OutputNotFoundError as exc: + outs = list(self.repo.index.tree.iteritems(key)) + except KeyError as exc: raise FileNotFoundError from exc ret = { "type": "file", - "outs": outs, "size": 0, "isexec": False, "isdvc": False, + "outs": outs, } - if len(outs) > 1: + if len(outs) > 1 and outs[0][0] != key: + shortest = self.repo.index.tree.shortest_prefix(key) + if shortest: + assert shortest[1][1].isdir + if len(shortest[0]) <= len(key): + ret["isdvc"] = True + ret["type"] = "directory" return ret - out = outs[0] + item_key, (meta, hash_info) = outs[0] - if not out.hash_info: - ret["isexec"] = out.meta.isexec - return ret - - if abspath == out.fs_path: - if out.hash_info.isdir: - ret["type"] = "directory" - ret["size"] = out.meta.size - ret["isexec"] = out.meta.isexec - ret[out.hash_info.name] = out.hash_info.value - ret["isdvc"] = True - ret["isout"] = True - return ret + meta = meta or Meta() - if out.fs_path.startswith(abspath + self.sep): + if key != item_key: + assert item_key[: len(key)] == key ret["type"] = "directory" return ret + ret["size"] = meta.size + ret["isexec"] = meta.isexec + ret[hash_info.name] = hash_info.value ret["isdvc"] = True - - try: - self._get_granular_hash(abspath, out) - except FileNotFoundError: + ret["isout"] = True + ret["meta"] = meta + if hash_info and hash_info.isdir: ret["type"] = "directory" - return ret - - key = self.repo.fs.path.relparts(abspath, out.fs_path) - (_, oid) = out.obj.trie.get(key) or (None, None) - ret[oid.name] = oid.value return ret def get_file( diff --git a/dvc/fs/repo.py b/dvc/fs/repo.py index 0f57128b64..3941d324e3 100644 --- a/dvc/fs/repo.py +++ b/dvc/fs/repo.py @@ -479,14 +479,19 @@ def info(self, path): fs_info = fs.info(path) fs_info["repo"] = dvc_fs.repo - fs_info["outs"] = dvc_info.get("outs", None) if dvc_info else None fs_info["isout"] = ( dvc_info.get("isout", False) if dvc_info else False ) + fs_info["outs"] = dvc_info["outs"] if dvc_info else None fs_info["isdvc"] = dvc_info["isdvc"] if dvc_info else False - fs_info["isexec"] = ( - dvc_info["isexec"] if dvc_info else is_exec(fs_info["mode"]) - ) + fs_info["meta"] = dvc_info.get("meta") if dvc_info else None + + isexec = False + if dvc_info: + isexec = dvc_info["isexec"] + elif fs_info["type"] == "file": + isexec = is_exec(fs_info["mode"]) + fs_info["isexec"] = isexec return fs_info except FileNotFoundError: diff --git a/dvc/repo/diff.py b/dvc/repo/diff.py index aefda984d8..6bd5480ec8 100644 --- a/dvc/repo/diff.py +++ b/dvc/repo/diff.py @@ -190,10 +190,12 @@ def _filter_missing(repo_fs, paths): for path in paths: try: info = repo_fs.info(path) - if info["isdvc"]: - out = info["outs"][0] - if out.status().get(str(out)) == "not in cache": - yield path + if ( + info["isdvc"] + and info["type"] == "directory" + and not info["meta"].obj + ): + yield path except FileNotFoundError: pass diff --git a/dvc/repo/index.py b/dvc/repo/index.py index ad07c99cb3..ccf667b0bb 100644 --- a/dvc/repo/index.py +++ b/dvc/repo/index.py @@ -19,6 +19,7 @@ from networkx import DiGraph from pygtrie import Trie + from dvc.data.tree import Tree from dvc.dependency import Dependency, ParamsDependency from dvc.fs.base import FileSystem from dvc.hash_info import HashInfo @@ -159,6 +160,35 @@ def outs_graph(self) -> "DiGraph": return build_outs_graph(self.graph, self.outs_trie) + @cached_property + def tree(self) -> "Tree": + from dvc.config import NoRemoteError + from dvc.data.tree import Tree + + tree = Tree(None, None, None) + + for out in self.outs: + if not out.use_cache: + continue + + if out.is_in_repo: + fs_key = "repo" + key = self.repo.fs.path.relparts( + out.fs_path, self.repo.root_dir + ) + else: + fs_key = out.fs.scheme + key = out.fs.path.parts(out.fs_path) + + out.meta.odb = out.odb + try: + out.meta.remote = self.repo.cloud.get_remote_odb(out.remote) + except NoRemoteError: + out.meta.remote = None + tree.add((fs_key,) + key, out.meta, out.hash_info) + + return tree + def used_objs( self, targets: "TargetType" = None, diff --git a/tests/func/objects/db/test_index.py b/tests/func/objects/db/test_index.py index fd41aafc3e..6df598fe58 100644 --- a/tests/func/objects/db/test_index.py +++ b/tests/func/objects/db/test_index.py @@ -16,7 +16,7 @@ def index(dvc, local_remote, mocker): def test_indexed_on_status(tmp_dir, dvc, index): foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] - baz_hash = bar.obj.trie.get(("baz",))[1] + baz_hash = bar.obj._trie.get(("baz",))[1] clean_staging() dvc.push() index.clear() @@ -30,7 +30,7 @@ def test_indexed_on_status(tmp_dir, dvc, index): def test_indexed_on_push(tmp_dir, dvc, index): foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] - baz_hash = bar.obj.trie.get(("baz",))[1] + baz_hash = bar.obj._trie.get(("baz",))[1] clean_staging() dvc.push() diff --git a/tests/func/test_ls.py b/tests/func/test_ls.py index 68e735ef59..4701375529 100644 --- a/tests/func/test_ls.py +++ b/tests/func/test_ls.py @@ -473,14 +473,14 @@ def test_ls_granular(erepo_dir): entries = Repo.ls(os.fspath(erepo_dir), os.path.join("dir", "subdir")) assert entries == [ - {"isout": False, "isdir": False, "isexec": False, "path": "bar"}, - {"isout": False, "isdir": False, "isexec": False, "path": "foo"}, + {"isout": True, "isdir": False, "isexec": False, "path": "bar"}, + {"isout": True, "isdir": False, "isexec": False, "path": "foo"}, ] entries = Repo.ls(os.fspath(erepo_dir), "dir") assert entries == [ - {"isout": False, "isdir": False, "isexec": False, "path": "1"}, - {"isout": False, "isdir": False, "isexec": False, "path": "2"}, + {"isout": True, "isdir": False, "isexec": False, "path": "1"}, + {"isout": True, "isdir": False, "isexec": False, "path": "2"}, {"isout": False, "isdir": True, "isexec": False, "path": "subdir"}, ] @@ -500,18 +500,20 @@ def test_ls_target(erepo_dir, use_scm): commit="create dir", ) + isout = not use_scm + def _ls(path): return Repo.ls(os.fspath(erepo_dir), path) assert _ls(os.path.join("dir", "1")) == [ - {"isout": False, "isdir": False, "isexec": False, "path": "1"} + {"isout": isout, "isdir": False, "isexec": False, "path": "1"} ] assert _ls(os.path.join("dir", "subdir", "foo")) == [ - {"isout": False, "isdir": False, "isexec": False, "path": "foo"} + {"isout": isout, "isdir": False, "isexec": False, "path": "foo"} ] assert _ls(os.path.join("dir", "subdir")) == [ - {"isdir": False, "isexec": 0, "isout": False, "path": "bar"}, - {"isdir": False, "isexec": 0, "isout": False, "path": "foo"}, + {"isdir": False, "isexec": 0, "isout": isout, "path": "bar"}, + {"isdir": False, "isexec": 0, "isout": isout, "path": "foo"}, ] diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index c2a43e2503..ff844ed7e3 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -271,7 +271,7 @@ def test_push_order(tmp_dir, dvc, tmp_path_factory, mocker, local_remote): # foo .dir file should be uploaded after bar odb = dvc.cloud.get_remote_odb("upstream") foo_path = odb.hash_to_path(foo.hash_info.value) - bar_path = odb.hash_to_path(foo.obj.trie[("bar",)][1].value) + bar_path = odb.hash_to_path(foo.obj._trie[("bar",)][1].value) paths = [args[3] for args, _ in mocked_upload.call_args_list] assert paths.index(foo_path) > paths.index(bar_path) diff --git a/tests/unit/fs/test_repo.py b/tests/unit/fs/test_repo.py index 4bfe24c10c..94e324bf4d 100644 --- a/tests/unit/fs/test_repo.py +++ b/tests/unit/fs/test_repo.py @@ -321,7 +321,7 @@ def test_isdvc(tmp_dir, dvc): assert fs.isdvc("foo") assert not fs.isdvc("bar") assert fs.isdvc("dir") - assert not fs.isdvc(os.path.join("dir", "baz")) + assert fs.isdvc(os.path.join("dir", "baz")) assert fs.isdvc(os.path.join("dir", "baz"), recursive=True) diff --git a/tests/unit/fs/test_repo_info.py b/tests/unit/fs/test_repo_info.py index faf4239a29..bc7b285d56 100644 --- a/tests/unit/fs/test_repo_info.py +++ b/tests/unit/fs/test_repo_info.py @@ -65,38 +65,25 @@ def test_info_git_tracked_file(repo_fs, path): assert not info["isdvc"] assert info["type"] == "file" assert not info["isexec"] - assert not info["outs"] @pytest.mark.parametrize( - "path, outs", + "path", [ - (os.path.join("data", "raw", "raw-1.csv"), ["data"]), - (os.path.join("data", "raw", "raw-2.csv"), ["data"]), - ( - os.path.join("data", "processed", "processed-1.csv"), - ["data"], - ), - ( - os.path.join("data", "processed", "processed-2.csv"), - ["data"], - ), - ( - os.path.join("models", "transform.pickle"), - [os.path.join("models", "transform.pickle")], - ), + os.path.join("data", "raw", "raw-1.csv"), + os.path.join("data", "raw", "raw-2.csv"), + os.path.join("data", "processed", "processed-1.csv"), + os.path.join("data", "processed", "processed-2.csv"), + os.path.join("models", "transform.pickle"), ], ) -def test_info_dvc_tracked_file(repo_fs, path, outs): +def test_info_dvc_tracked_file(repo_fs, path): info = repo_fs.info(path) assert info["repo"].root_dir == repo_fs.root_dir assert info["isdvc"] assert info["type"] == "file" assert not info["isexec"] - assert {out.fs_path for out in info["outs"]} == { - os.path.join(repo_fs.root_dir, out) for out in outs - } @pytest.mark.parametrize("path", ["src", os.path.join("src", "utils")]) @@ -106,18 +93,11 @@ def test_info_git_only_dirs(repo_fs, path): assert info["repo"].root_dir == repo_fs.root_dir assert not info["isdvc"] assert info["type"] == "directory" - assert info["isexec"] - assert not info["outs"] + assert not info["isexec"] -@pytest.mark.parametrize( - "path, expected_outs", - [ - (".", ["data", os.path.join("models", "transform.pickle")]), - ("models", [os.path.join("models", "transform.pickle")]), - ], -) -def test_info_git_dvc_mixed_dirs(repo_fs, path, expected_outs): +@pytest.mark.parametrize("path", [".", "models"]) +def test_info_git_dvc_mixed_dirs(repo_fs, path): info = repo_fs.info(os.path.join(repo_fs.root_dir, path)) assert info["repo"].root_dir == repo_fs.root_dir @@ -125,10 +105,6 @@ def test_info_git_dvc_mixed_dirs(repo_fs, path, expected_outs): assert info["type"] == "directory" assert not info["isexec"] - assert {out.fs_path for out in info["outs"]} == { - os.path.join(repo_fs.root_dir, out) for out in expected_outs - } - @pytest.mark.parametrize( "path", @@ -145,9 +121,6 @@ def test_info_dvc_only_dirs(repo_fs, path): assert info["isdvc"] assert info["type"] == "directory" assert not info["isexec"] - assert {out.fs_path for out in info["outs"]} == { - os.path.join(repo_fs.root_dir, "data") - } def test_info_on_subrepos(make_tmp_dir, tmp_dir, dvc, scm, repo_fs):