Skip to content

Commit

Permalink
objects: use object IDs and references instead of naive objs in statu…
Browse files Browse the repository at this point in the history
…s/transfer (#6360)

* objects.status: use object IDs instead of naive objects

* objects.transfer: use object IDs instead of naive objects

* objects: add ReferenceHashFile

* odb: add ReferenceObjectDB

* repo: collect used object IDs instead of naive objs

* support basic/naive fs serialization

* objects.stage: stage file objects in addition to trees

* objects: remove save(), update transfer()

* output/dep: update for osave removal

* path_info: support both url and path attributes

* status/gc: use object IDs instead of naive objs

* diff: update staging usage

* commit: update staging usage

* tests: clean staging when needed

* tests: update for save/transfer/stage changes

* dvcfs: don't allow config serialization (repofs should be used instead)

* utils: return mtime as str, size as int

* tests: update unit tests for save/stage/transfer changes

* objects: use Repo.open when needed in ReferenceHashFile

* fix ls test staging

* objects.stage: don't skip hash check when loading from state

- individual hashes will still be loaded from state if they are still
  valid

* ReferenceHashFile: serialize repo fs paths relative to repo root

* preserve state when staging

* objects.tree: skip hash computation in digest() when we have already
loaded hash from state

* always check file existence in referencehashfile

* update repofs test since stage no longer allows nonexistent files

* odb: only protect on check() if hash was verified

* staging: use unique memfs namespace for staging per dest ODB

* tests: clean full staging namespace

* refodb: preserve repofs cache dir

* repodependency: stage imports to local ODB and strict check circular
imports

* fetch: fetch staging last

* staging: make dest odb mandatory and preserve state

* refodb: cache nonlocal fs's

* fix rebase errors

* repofs: support full instantiation from fs config

* repofs: preserve subrepo factory for erepos

* tests: clarify repofs dirty file behavior
  • Loading branch information
pmrowla authored Aug 4, 2021
1 parent 34697ef commit 114a07e
Show file tree
Hide file tree
Showing 42 changed files with 942 additions and 520 deletions.
8 changes: 6 additions & 2 deletions dvc/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def _changed(path_info, fs, obj, cache, state=None):
actual = state.get(path_info, fs) if state else None
if actual is None:
try:
actual = stage(cache, path_info, fs, obj.hash_info.name).hash_info
actual = stage(
cache, path_info, fs, obj.hash_info.name, dry_run=True
)[1].hash_info
except FileNotFoundError:
logger.debug("'%s' doesn't exist.", path_info)
return True
Expand All @@ -61,7 +63,9 @@ def _remove(path_info, fs, cache, force=False):
fs.remove(path_info)
return

current = stage(cache, path_info, fs, fs.PARAM_CHECKSUM).hash_info
current = stage(cache, path_info, fs, fs.PARAM_CHECKSUM, dry_run=True)[
1
].hash_info
try:
obj = load(cache, current)
check(cache, obj)
Expand Down
8 changes: 4 additions & 4 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from dvc.objects.db import get_index

if TYPE_CHECKING:
from dvc.hash_info import HashInfo
from dvc.objects.db.base import ObjectDB
from dvc.objects.file import HashFile

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,7 +64,7 @@ def _init_odb(self, name):

def push(
self,
objs: Iterable["HashFile"],
objs: Iterable["HashInfo"],
jobs: Optional[int] = None,
remote: Optional[str] = None,
odb: Optional["ObjectDB"] = None,
Expand Down Expand Up @@ -93,7 +93,7 @@ def push(

def pull(
self,
objs: Iterable["HashFile"],
objs: Iterable["HashInfo"],
jobs: Optional[int] = None,
remote: Optional[str] = None,
odb: Optional["ObjectDB"] = None,
Expand Down Expand Up @@ -123,7 +123,7 @@ def pull(

def status(
self,
objs: Iterable["HashFile"],
objs: Iterable["HashInfo"],
jobs: Optional[int] = None,
remote: Optional[str] = None,
odb: Optional["ObjectDB"] = None,
Expand Down
68 changes: 47 additions & 21 deletions dvc/dependency/repo.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import os
from collections import defaultdict
from copy import copy
from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple

from funcy import first
from voluptuous import Required

from dvc.path_info import PathInfo

from .base import Dependency

if TYPE_CHECKING:
from dvc.hash_info import HashInfo
from dvc.objects.db.base import ObjectDB
from dvc.objects.file import HashFile

Expand Down Expand Up @@ -91,31 +92,32 @@ def changed_checksum(self):

def get_used_objs(
self, **kwargs
) -> Dict[Optional["ObjectDB"], Set["HashFile"]]:
) -> Dict[Optional["ObjectDB"], Set["HashInfo"]]:
used, _ = self._get_used_and_obj(**kwargs)
return used

def _get_used_and_obj(
self, obj_only=False, **kwargs
) -> Tuple[Dict[Optional["ObjectDB"], Set["HashFile"]], "HashFile"]:
) -> Tuple[Dict[Optional["ObjectDB"], Set["HashInfo"]], "HashFile"]:
from dvc.config import NoRemoteError
from dvc.exceptions import NoOutputOrStageError, PathMissingError
from dvc.objects.stage import get_staging, stage
from dvc.objects.stage import stage
from dvc.objects.tree import Tree

local_odb = self.repo.odb.local
locked = kwargs.pop("locked", True)
with self._make_repo(
locked=locked, cache_dir=local_odb.cache_dir
) as repo:
used_objs = defaultdict(set)
used_obj_ids = defaultdict(set)
rev = repo.get_rev()
if locked and self.def_repo.get(self.PARAM_REV_LOCK) is None:
self.def_repo[self.PARAM_REV_LOCK] = rev

path_info = PathInfo(repo.root_dir) / str(self.def_path)
if not obj_only:
try:
for odb, objs in repo.used_objs(
for odb, obj_ids in repo.used_objs(
[os.fspath(path_info)],
force=True,
jobs=kwargs.get("jobs"),
Expand All @@ -124,14 +126,14 @@ def _get_used_and_obj(
if odb is None:
odb = repo.cloud.get_remote_odb()
odb.read_only = True
self._check_circular_import(objs)
used_objs[odb].update(objs)
self._check_circular_import(odb, obj_ids)
used_obj_ids[odb].update(obj_ids)
except (NoRemoteError, NoOutputOrStageError):
pass

try:
staged_obj = stage(
None,
staging, staged_obj = stage(
local_odb,
path_info,
repo.repo_fs,
local_odb.fs.PARAM_CHECKSUM,
Expand All @@ -140,27 +142,51 @@ def _get_used_and_obj(
raise PathMissingError(
self.def_path, self.def_repo[self.PARAM_URL]
) from exc
staging = get_staging()
staging = copy(staging)
staging.read_only = True

self._staged_objs[rev] = staged_obj
used_objs[staging].add(staged_obj)
return used_objs, staged_obj
used_obj_ids[staging].add(staged_obj.hash_info)
if isinstance(staged_obj, Tree):
used_obj_ids[staging].update(
entry.hash_info for _, entry in staged_obj
)
return used_obj_ids, staged_obj

def _check_circular_import(self, objs):
def _check_circular_import(self, odb, obj_ids):
from dvc.exceptions import CircularImportError
from dvc.fs.repo import RepoFileSystem
from dvc.objects.db.reference import ReferenceObjectDB
from dvc.objects.tree import Tree

obj = first(objs)
if isinstance(obj, Tree):
_, obj = first(obj)
if not isinstance(obj.fs, RepoFileSystem):
if not isinstance(odb, ReferenceObjectDB):
return

self_url = self.repo.url or self.repo.root_dir
if obj.fs.repo_url is not None and obj.fs.repo_url == self_url:
raise CircularImportError(self, obj.fs.repo_url, self_url)
def iter_objs():
for hash_info in obj_ids:
if hash_info.isdir:
tree = Tree.load(odb, hash_info)
yield from (odb.get(entry.hash_info) for _, entry in tree)
else:
yield odb.get(hash_info)

checked_urls = set()
for obj in iter_objs():
if not isinstance(obj.fs, RepoFileSystem):
continue
if (
obj.fs.repo_url in checked_urls
or obj.fs.root_dir in checked_urls
):
continue
self_url = self.repo.url or self.repo.root_dir
if (
obj.fs.repo_url is not None
and obj.fs.repo_url == self_url
or obj.fs.root_dir == self.repo.root_dir
):
raise CircularImportError(self, obj.fs.repo_url, self_url)
checked_urls.update([obj.fs.repo_url, obj.fs.root_dir])

def get_obj(self, filter_info=None, **kwargs):
locked = kwargs.get("locked", True)
Expand Down
22 changes: 13 additions & 9 deletions dvc/external_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,14 @@ def external_repo(
config = _get_remote_config(url) if os.path.isdir(url) else {}
config.update(cache_config)

def make_repo(path, **_kwargs):
_config = cache_config.copy()
if os.path.isdir(url):
rel = os.path.relpath(path, _kwargs["scm"].root_dir)
repo_path = os.path.join(url, rel)
_config.update(_get_remote_config(repo_path))
return Repo(path, config=_config, **_kwargs)

root_dir = path if for_write else os.path.realpath(path)
repo_kwargs = dict(
root_dir=root_dir,
url=url,
scm=None if for_write else Git(root_dir),
rev=None if for_write else rev,
config=config,
repo_factory=make_repo,
repo_factory=erepo_factory(url, cache_config),
**kwargs,
)

Expand Down Expand Up @@ -87,6 +79,18 @@ def make_repo(path, **_kwargs):
_remove(path)


def erepo_factory(url, cache_config, *args, **kwargs):
def make_repo(path, **_kwargs):
_config = cache_config.copy()
if os.path.isdir(url):
rel = os.path.relpath(path, _kwargs["scm"].root_dir)
repo_path = os.path.join(url, rel)
_config.update(_get_remote_config(repo_path))
return Repo(path, config=_config, **_kwargs)

return make_repo


CLONES: Dict[str, str] = {}
CACHE_DIRS: Dict[str, str] = {}

Expand Down
5 changes: 3 additions & 2 deletions dvc/fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
}


def get_fs_cls(remote_conf):
scheme = urlparse(remote_conf["url"]).scheme
def get_fs_cls(remote_conf, scheme=None):
if not scheme:
scheme = urlparse(remote_conf["url"]).scheme
return FS_MAP.get(scheme, LocalFileSystem)


Expand Down
5 changes: 5 additions & 0 deletions dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def __init__(self, **kwargs):

self.jobs = kwargs.get("jobs") or self._JOBS
self.hash_jobs = kwargs.get("checksum_jobs") or self.HASH_JOBS
self._config = kwargs

@property
def config(self):
return self._config

@classmethod
def _strip_protocol(cls, path: str):
Expand Down
4 changes: 4 additions & 0 deletions dvc/fs/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self.repo = kwargs["repo"]

@property
def config(self):
raise NotImplementedError

def _find_outs(self, path, *args, **kwargs):
outs = self.repo.find_outs_by_path(path, *args, **kwargs)

Expand Down
78 changes: 77 additions & 1 deletion dvc/fs/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,29 @@ class RepoFileSystem(BaseFileSystem): # pylint:disable=abstract-method

scheme = "local"
PARAM_CHECKSUM = "md5"
PARAM_REPO_URL = "repo_url"
PARAM_REPO_ROOT = "repo_root"
PARAM_REV = "rev"
PARAM_CACHE_DIR = "cache_dir"
PARAM_CACHE_TYPES = "cache_types"
PARAM_SUBREPOS = "subrepos"

def __init__(
self, repo=None, subrepos=False, repo_factory: RepoFactory = None
self,
repo: Optional["Repo"] = None,
subrepos=False,
repo_factory: RepoFactory = None,
**kwargs,
):
super().__init__()

from dvc.utils.collections import PathStringTrie

if repo is None:
repo, repo_factory = self._repo_from_fs_config(
subrepos=subrepos, **kwargs
)

if not repo_factory:
from dvc.repo import Repo

Expand Down Expand Up @@ -72,6 +87,67 @@ def repo_url(self):
return None
return self._main_repo.url

@property
def config(self):
return {
self.PARAM_REPO_URL: self.repo_url,
self.PARAM_REPO_ROOT: self.root_dir,
self.PARAM_REV: getattr(self._main_repo.fs, "rev", None),
self.PARAM_CACHE_DIR: os.path.abspath(
self._main_repo.odb.local.cache_dir
),
self.PARAM_CACHE_TYPES: self._main_repo.odb.local.cache_types,
self.PARAM_SUBREPOS: self._traverse_subrepos,
}

@classmethod
def _repo_from_fs_config(
cls, **config
) -> Tuple["Repo", Optional["RepoFactory"]]:
from dvc.external_repo import erepo_factory, external_repo
from dvc.repo import Repo

url = config.get(cls.PARAM_REPO_URL)
root = config.get(cls.PARAM_REPO_ROOT)
assert url or root

def _open(*args, **kwargs):
# NOTE: if original repo was an erepo (and has a URL),
# we cannot use Repo.open() since it will skip erepo
# cache/remote setup for local URLs
if url is None:
return Repo.open(*args, **kwargs)
return external_repo(*args, **kwargs)

cache_dir = config.get(cls.PARAM_CACHE_DIR)
cache_config = (
{}
if not cache_dir
else {
"cache": {
"dir": cache_dir,
"type": config.get(cls.PARAM_CACHE_TYPES),
}
}
)
repo_kwargs: dict = {
"rev": config.get(cls.PARAM_REV),
"subrepos": config.get(cls.PARAM_SUBREPOS, False),
"uninitialized": True,
}
factory: Optional["RepoFactory"] = None
if url is None:
repo_kwargs["config"] = cache_config
else:
repo_kwargs["cache_dir"] = cache_dir
factory = erepo_factory(url, cache_config)

with _open(
url if url else root,
**repo_kwargs,
) as repo:
return repo, factory

def _get_repo(self, path: str) -> Optional["Repo"]:
"""Returns repo that the path falls in, using prefix.
Expand Down
1 change: 1 addition & 0 deletions dvc/hash_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class HashInfo:
value: Optional[str]
size: Optional[int] = field(default=None, compare=False)
nfiles: Optional[int] = field(default=None, compare=False)
obj_name: Optional[str] = field(default=None, compare=False)

def __bool__(self):
return bool(self.value)
Expand Down
Loading

0 comments on commit 114a07e

Please sign in to comment.