Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvcfs: detach from pipeline outputs #7353

Merged
merged 1 commit into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion dvc/data/meta.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Optional
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from dvc.objects.db import ObjectDB
from dvc.objects.file import HashFile


@dataclass
Expand All @@ -13,6 +17,10 @@ class Meta:
nfiles: Optional[int] = field(default=None)
isexec: Optional[bool] = field(default=False)

obj: Optional["HashFile"] = field(default=None)
odb: Optional["ObjectDB"] = field(default=None)
remote: Optional[str] = field(default=None)

@classmethod
def from_dict(cls, d):
if not d:
Expand Down
2 changes: 1 addition & 1 deletion dvc/data/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _build_tree(fs_path, fs, name, **kwargs):
assert key
tree.add(key, meta, obj.hash_info)

tree_meta.size += meta.size
tree_meta.size += meta.size or 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, why can not just set them default 0? Like in C or Java current None is just like null point in other languages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karajan1001 This is a bit messy, but we have meta set to None for files inside of dvc add dir dirs. There seems to be a value in separating 0 and "not set" state, but you are right, we'll probably set it to defaults in the future.

tree_meta.nfiles += 1

return tree_meta, tree
Expand Down
49 changes: 45 additions & 4 deletions dvc/data/tree.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import posixpath
from typing import TYPE_CHECKING, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Tuple

from funcy import cached_property

Expand All @@ -12,12 +12,29 @@

if TYPE_CHECKING:
from dvc.hash_info import HashInfo
from dvc.objects.db import ObjectDB

from .meta import Meta

logger = logging.getLogger(__name__)


def _try_load(
odbs: Iterable["ObjectDB"],
hash_info: "HashInfo",
) -> Optional["HashFile"]:
for odb in odbs:
if not odb:
continue

try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can reduce one line.

with suppress(FileNotFoundError, ObjectFormatError):

return Tree.load(odb, hash_info)
except (FileNotFoundError, ObjectFormatError):
pass

return None


class Tree(HashFile):
PARAM_RELPATH = "relpath"

Expand All @@ -26,7 +43,7 @@ def __init__(self, *args, **kwargs):
self._dict: Dict[Tuple[str], Tuple["Meta", "HashInfo"]] = {}

@cached_property
def trie(self):
def _trie(self):
from pygtrie import Trie

return Trie(self._dict)
Expand All @@ -52,6 +69,30 @@ def digest(self, hash_info: Optional["HashInfo"] = None):
assert self.hash_info.value
self.hash_info.value += ".dir"

def _load(self, key, meta, hash_info):
if hash_info and hash_info.isdir and not meta.obj:
meta.obj = _try_load([meta.odb, meta.remote], hash_info)
if meta.obj:
for ikey, value in meta.obj.iteritems():
self._trie[key + ikey] = value
self._dict[key + ikey] = value

def iteritems(self, prefix=None):
kwargs = {}
if prefix:
kwargs = {"prefix": prefix}
item = self._trie.longest_prefix(prefix)
if item:
key, (meta, hash_info) = item
self._load(key, meta, hash_info)

for key, (meta, hash_info) in self._trie.iteritems(**kwargs):
self._load(key, meta, hash_info)
yield key, (meta, hash_info)

def shortest_prefix(self, *args, **kwargs):
return self._trie.shortest_prefix(*args, **kwargs)

def __len__(self):
return len(self._dict)

Expand Down Expand Up @@ -131,7 +172,7 @@ def filter(self, prefix: Tuple[str]) -> Optional["Tree"]:
"""
tree = Tree(self.fs_path, self.fs, self.hash_info)
try:
for key, (meta, oid) in self.trie.items(prefix):
for key, (meta, oid) in self._trie.items(prefix):
tree.add(key, meta, oid)
except KeyError:
pass
Expand All @@ -149,7 +190,7 @@ def get(self, odb, prefix: Tuple[str]) -> Optional[HashFile]:
tree = Tree(None, None, None)
depth = len(prefix)
try:
for key, (meta, entry_oid) in self.trie.items(prefix):
for key, (meta, entry_oid) in self._trie.items(prefix):
tree.add(key[depth:], meta, entry_oid)
except KeyError:
return None
Expand Down
180 changes: 65 additions & 115 deletions dvc/fs/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
import os
import typing

from dvc.exceptions import OutputNotFoundError
from dvc.utils import relpath

from ._callback import DEFAULT_CALLBACK
from .base import FileSystem

if typing.TYPE_CHECKING:
from dvc.output import Output
from dvc.types import AnyPath

logger = logging.getLogger(__name__)
Expand All @@ -35,67 +31,44 @@ def __init__(self, **kwargs):
def config(self):
raise NotImplementedError

def _find_outs(self, path, *args, **kwargs):
outs = self.repo.find_outs_by_path(path, *args, **kwargs)
def _get_key(self, path):
from . import get_cloud_fs

def _is_cached(out):
return out.use_cache
cls, kwargs, fs_path = get_cloud_fs(None, url=path)

outs = list(filter(_is_cached, outs))
if not outs:
raise OutputNotFoundError(path, self.repo)
if not path.startswith(self.repo.root_dir):
fs = cls(**kwargs)
return (cls.scheme, *fs.path.parts(fs_path))

return outs
fs_key = "repo"
key = self.path.relparts(path, self.repo.root_dir)
if key == (".",):
key = ()

def _get_granular_hash(self, path: "AnyPath", out: "Output", remote=None):
# NOTE: use string paths here for performance reasons
key = tuple(relpath(path, out.fs_path).split(os.sep))
out.get_dir_cache(remote=remote)
if out.obj is None:
raise FileNotFoundError
(_, oid) = out.obj.trie.get(key) or (None, None)
if oid:
return oid
raise FileNotFoundError
return (fs_key, *key)

def _get_fs_path(self, path: "AnyPath", remote=None):
try:
outs = self._find_outs(path, strict=False)
except OutputNotFoundError as exc:
raise FileNotFoundError from exc
from dvc.config import NoRemoteError

if len(outs) != 1 or (
outs[0].is_dir_checksum and path == outs[0].fs_path
):
info = self.info(path)
if info["type"] == "directory":
raise IsADirectoryError

out = outs[0]

if not out.hash_info:
value = info.get("md5")
if not value:
raise FileNotFoundError

if out.changed_cache(filter_info=path):
from dvc.config import NoRemoteError

try:
remote_odb = self.repo.cloud.get_remote_odb(remote)
except NoRemoteError as exc:
raise FileNotFoundError from exc
if out.is_dir_checksum:
checksum = self._get_granular_hash(path, out).value
else:
checksum = out.hash_info.value
remote_fs_path = remote_odb.hash_to_path(checksum)
return remote_odb.fs, remote_fs_path

if out.is_dir_checksum:
checksum = self._get_granular_hash(path, out).value
cache_path = out.odb.fs.unstrip_protocol(
out.odb.hash_to_path(checksum)
)
else:
cache_path = out.cache_path
return out.odb.fs, cache_path
cache_path = self.repo.odb.local.hash_to_path(value)

if self.repo.odb.local.fs.exists(cache_path):
return self.repo.odb.local.fs, cache_path

try:
remote_odb = self.repo.cloud.get_remote_odb(remote)
except NoRemoteError as exc:
raise FileNotFoundError from exc
remote_fs_path = remote_odb.hash_to_path(value)
return remote_odb.fs, remote_fs_path

def open( # type: ignore
self, path: str, mode="r", encoding=None, **kwargs
Expand All @@ -122,37 +95,27 @@ def isfile(self, path): # pylint: disable=arguments-renamed
except FileNotFoundError:
return False

def _fetch_dir(self, out, **kwargs):
# pull dir cache if needed
out.get_dir_cache(**kwargs)

if not out.obj:
raise FileNotFoundError

def _add_dir(self, trie, out, **kwargs):
self._fetch_dir(out, **kwargs)

base = out.fs.path.parts(out.fs_path)
for key, _, _ in out.obj: # noqa: B301
trie[base + key] = None

def _walk(self, root, trie, topdown=True, **kwargs):
def _walk(self, root, topdown=True, **kwargs):
dirs = set()
files = []

root_parts = self.path.parts(root)
out = trie.get(root_parts)
if out and out.is_dir_checksum:
self._add_dir(trie, out, **kwargs)

root_parts = self._get_key(root)
root_len = len(root_parts)
try:
for key, out in trie.iteritems(prefix=root_parts): # noqa: B301
for key, (meta, hash_info) in self.repo.index.tree.iteritems(
prefix=root_parts
): # noqa: B301
if hash_info and hash_info.isdir and meta and not meta.obj:
raise FileNotFoundError

if key == root_parts:
continue

if hash_info.isdir:
continue

name = key[root_len]
if len(key) > root_len + 1 or (out and out.is_dir_checksum):
if len(key) > root_len + 1:
dirs.add(name)
continue

Expand All @@ -165,11 +128,9 @@ def _walk(self, root, trie, topdown=True, **kwargs):
yield root, dirs, files

for dname in dirs:
yield from self._walk(self.path.join(root, dname), trie)
yield from self._walk(self.path.join(root, dname))

def walk(self, top, topdown=True, onerror=None, **kwargs):
from pygtrie import Trie

assert topdown
root = os.path.abspath(top)
try:
Expand All @@ -184,14 +145,7 @@ def walk(self, top, topdown=True, onerror=None, **kwargs):
onerror(NotADirectoryError(top))
return

trie = Trie()
for out in info["outs"]:
trie[out.fs.path.parts(out.fs_path)] = out

if out.is_dir_checksum and self.path.isin_or_eq(root, out.fs_path):
self._add_dir(trie, out, **kwargs)

yield from self._walk(root, trie, topdown=topdown, **kwargs)
yield from self._walk(root, topdown=topdown, **kwargs)

def find(self, path, prefix=None):
for root, _, files in self.walk(path):
Expand All @@ -209,56 +163,52 @@ def isdvc(self, path, recursive=False, strict=True):
return bool(info.get("outs") if recurse else info.get("isout"))

def info(self, path):
from dvc.data.meta import Meta

abspath = os.path.abspath(path)

key = self._get_key(abspath)

try:
outs = self._find_outs(abspath, strict=False, recursive=True)
except OutputNotFoundError as exc:
outs = list(self.repo.index.tree.iteritems(key))
except KeyError as exc:
raise FileNotFoundError from exc

ret = {
"type": "file",
"outs": outs,
"size": 0,
"isexec": False,
"isdvc": False,
"outs": outs,
}

if len(outs) > 1:
if len(outs) > 1 and outs[0][0] != key:
shortest = self.repo.index.tree.shortest_prefix(key)
if shortest:
assert shortest[1][1].isdir
if len(shortest[0]) <= len(key):
ret["isdvc"] = True
Comment on lines +186 to +190
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These odd flags and logic will be removed in a followup, when we just use fs and dvcfs separately (with subrepos), instead of passing through tons of odd flags used inappropriately in ls and diff.


ret["type"] = "directory"
return ret

out = outs[0]
item_key, (meta, hash_info) = outs[0]

if not out.hash_info:
ret["isexec"] = out.meta.isexec
return ret

if abspath == out.fs_path:
if out.hash_info.isdir:
ret["type"] = "directory"
ret["size"] = out.meta.size
ret["isexec"] = out.meta.isexec
ret[out.hash_info.name] = out.hash_info.value
ret["isdvc"] = True
ret["isout"] = True
return ret
meta = meta or Meta()

if out.fs_path.startswith(abspath + self.sep):
if key != item_key:
assert item_key[: len(key)] == key
ret["type"] = "directory"
return ret

ret["size"] = meta.size
ret["isexec"] = meta.isexec
ret[hash_info.name] = hash_info.value
ret["isdvc"] = True

try:
self._get_granular_hash(abspath, out)
except FileNotFoundError:
ret["isout"] = True
ret["meta"] = meta
if hash_info and hash_info.isdir:
ret["type"] = "directory"
return ret

key = self.repo.fs.path.relparts(abspath, out.fs_path)
(_, oid) = out.obj.trie.get(key) or (None, None)
ret[oid.name] = oid.value
return ret

def get_file(
Expand Down
Loading