-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
dvc.py
422 lines (334 loc) · 13.4 KB
/
dvc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
import errno
import functools
import logging
import ntpath
import os
import posixpath
import threading
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Type, Union
from fsspec.spec import AbstractFileSystem
from funcy import wrap_with
from dvc_objects.fs.base import FileSystem
from dvc_objects.fs.path import Path
from .data import DataFileSystem
if TYPE_CHECKING:
from dvc.repo import Repo
from dvc.types import StrPath
logger = logging.getLogger(__name__)
RepoFactory = Union[Callable[..., "Repo"], Type["Repo"]]
Key = Tuple[str, ...]
def as_posix(path: str) -> str:
return path.replace(ntpath.sep, posixpath.sep)
# NOT the same as dvc.dvcfile.is_dvc_file()!
def _is_dvc_file(fname):
from dvc.dvcfile import is_valid_filename
from dvc.ignore import DvcIgnore
return is_valid_filename(fname) or fname == DvcIgnore.DVCIGNORE_FILE
def _merge_info(repo, fs_info, dvc_info):
from . import utils
ret = {"repo": repo}
if dvc_info:
ret["dvc_info"] = dvc_info
ret["type"] = dvc_info["type"]
ret["size"] = dvc_info["size"]
if not fs_info and "md5" in dvc_info:
ret["md5"] = dvc_info["md5"]
if fs_info:
ret["type"] = fs_info["type"]
ret["size"] = fs_info["size"]
isexec = False
if fs_info["type"] == "file":
isexec = utils.is_exec(fs_info["mode"])
ret["isexec"] = isexec
return ret
def _get_dvc_path(dvc_fs, subkey):
return dvc_fs.path.join(*subkey) if subkey else ""
class _DVCFileSystem(AbstractFileSystem): # pylint:disable=abstract-method
cachable = False
root_marker = "/"
def __init__(
self,
url: Optional[str] = None,
rev: Optional[str] = None,
repo: Optional["Repo"] = None,
subrepos: bool = False,
repo_factory: Optional[RepoFactory] = None,
**repo_kwargs: Any,
) -> None:
"""DVC + git-tracked files fs.
Args:
path (str, optional): URL or path to a DVC/Git repository.
Defaults to a DVC repository in the current working directory.
Both HTTP and SSH protocols are supported for remote Git repos
(e.g. [user@]server:project.git).
rev (str, optional): Any Git revision such as a branch or tag name,
a commit hash or a dvc experiment name.
Defaults to the default branch in case of remote repositories.
In case of a local repository, if rev is unspecified, it will
default to the working directory.
If the repo is not a Git repo, this option is ignored.
repo (:obj:`Repo`, optional): `Repo` instance.
subrepos (bool): traverse to subrepos.
By default, it ignores subrepos.
repo_factory (callable): A function to initialize subrepo with.
The default is `Repo`.
Examples:
- Opening a filesystem from repo in current working directory
>>> fs = DVCFileSystem()
- Opening a filesystem from local repository
>>> fs = DVCFileSystem("path/to/local/repository")
- Opening a remote repository
>>> fs = DVCFileSystem(
... "https://github.com/iterative/example-get-started",
... rev="main",
... )
"""
from pygtrie import Trie
super().__init__()
if repo is None:
repo = self._make_repo(url=url, rev=rev, subrepos=subrepos, **repo_kwargs)
assert repo is not None
# pylint: disable=protected-access
repo_factory = repo._fs_conf["repo_factory"]
if not repo_factory:
from dvc.repo import Repo
self.repo_factory: RepoFactory = Repo
else:
self.repo_factory = repo_factory
def _getcwd():
relparts = ()
assert repo is not None
if repo.fs.path.isin(repo.fs.path.getcwd(), repo.root_dir):
relparts = repo.fs.path.relparts(repo.fs.path.getcwd(), repo.root_dir)
return self.root_marker + self.sep.join(relparts)
self.path = Path(self.sep, getcwd=_getcwd)
self.repo = repo
self.hash_jobs = repo.fs.hash_jobs
self._traverse_subrepos = subrepos
self._subrepos_trie = Trie()
"""Keeps track of each and every path with the corresponding repo."""
key = self._get_key(self.repo.root_dir)
self._subrepos_trie[key] = repo
self._datafss = {}
"""Keep a datafs instance of each repo."""
if hasattr(repo, "dvc_dir"):
self._datafss[key] = DataFileSystem(index=repo.index.data["repo"])
def _get_key(self, path: "StrPath") -> Key:
parts = self.repo.fs.path.relparts(path, self.repo.root_dir)
if parts == (os.curdir,):
return ()
return parts
def _get_key_from_relative(self, path) -> Key:
parts = self.path.relparts(path, self.root_marker)
if parts and parts[0] == os.curdir:
return parts[1:]
return parts
def _from_key(self, parts: Key) -> str:
return self.repo.fs.path.join(self.repo.root_dir, *parts)
@property
def repo_url(self):
return self.repo.url
@classmethod
def _make_repo(cls, **kwargs) -> "Repo":
from dvc.repo import Repo
with Repo.open(uninitialized=True, **kwargs) as repo:
return repo
def _get_repo(self, key: Key) -> "Repo":
"""Returns repo that the path falls in, using prefix.
If the path is already tracked/collected, it just returns the repo.
Otherwise, it collects the repos that might be in the path's parents
and then returns the appropriate one.
"""
repo = self._subrepos_trie.get(key)
if repo:
return repo
prefix_key, repo = self._subrepos_trie.longest_prefix(key)
dir_keys = (key[:i] for i in range(len(prefix_key) + 1, len(key) + 1))
self._update(dir_keys, starting_repo=repo)
return self._subrepos_trie.get(key) or self.repo
@wrap_with(threading.Lock())
def _update(self, dir_keys, starting_repo):
"""Checks for subrepo in directories and updates them."""
repo = starting_repo
for key in dir_keys:
d = self._from_key(key)
if self._is_dvc_repo(d):
repo = self.repo_factory(
d,
fs=self.repo.fs,
scm=self.repo.scm,
repo_factory=self.repo_factory,
)
self._datafss[key] = DataFileSystem(index=repo.index.data["repo"])
self._subrepos_trie[key] = repo
def _is_dvc_repo(self, dir_path):
"""Check if the directory is a dvc repo."""
if not self._traverse_subrepos:
return False
from dvc.repo import Repo
repo_path = self.repo.fs.path.join(dir_path, Repo.DVC_DIR)
return self.repo.fs.isdir(repo_path)
def _get_subrepo_info(
self, key: Key
) -> Tuple["Repo", Optional[DataFileSystem], Key]:
"""
Returns information about the subrepo the key is part of.
"""
repo = self._get_repo(key)
repo_key: Key
if repo is self.repo:
repo_key = ()
subkey = key
else:
repo_key = self._get_key(repo.root_dir)
subkey = key[len(repo_key) :]
dvc_fs = self._datafss.get(repo_key)
return repo, dvc_fs, subkey
def _open(
self, path, mode="rb", **kwargs
): # pylint: disable=arguments-renamed, arguments-differ
if mode != "rb":
raise OSError(errno.EROFS, os.strerror(errno.EROFS))
key = self._get_key_from_relative(path)
fs_path = self._from_key(key)
try:
return self.repo.fs.open(fs_path, mode=mode)
except FileNotFoundError:
repo, dvc_fs, subkey = self._get_subrepo_info(key)
if not dvc_fs:
raise
dvc_path = _get_dvc_path(dvc_fs, subkey)
kw = {}
if kwargs.get("cache_remote_stream", False):
kw["cache_odb"] = repo.cache.local
return dvc_fs.open(dvc_path, mode=mode, **kw)
def isdvc(self, path, **kwargs) -> bool:
"""Is this entry dvc-tracked?"""
key = self._get_key_from_relative(path)
_, dvc_fs, subkey = self._get_subrepo_info(key)
dvc_path = _get_dvc_path(dvc_fs, subkey)
return dvc_fs is not None and dvc_fs.isdvc(dvc_path, **kwargs)
def ls( # pylint: disable=arguments-differ # noqa: C901
self, path, detail=True, dvc_only=False, **kwargs
):
key = self._get_key_from_relative(path)
repo, dvc_fs, subkey = self._get_subrepo_info(key)
dvc_exists = False
dvc_infos = {}
if dvc_fs:
dvc_path = _get_dvc_path(dvc_fs, subkey)
with suppress(FileNotFoundError):
for info in dvc_fs.ls(dvc_path, detail=True):
dvc_infos[dvc_fs.path.name(info["name"])] = info
dvc_exists = bool(dvc_infos) or dvc_fs.exists(dvc_path)
fs_exists = False
fs_infos = {}
ignore_subrepos = kwargs.get("ignore_subrepos", True)
if not dvc_only:
fs = self.repo.fs
fs_path = self._from_key(key)
try:
for info in repo.dvcignore.ls(
fs, fs_path, detail=True, ignore_subrepos=ignore_subrepos
):
fs_infos[fs.path.name(info["name"])] = info
except (FileNotFoundError, NotADirectoryError):
pass
fs_exists = bool(fs_infos) or fs.exists(fs_path)
dvcfiles = kwargs.get("dvcfiles", False)
infos = []
paths = []
names = set(dvc_infos.keys()) | set(fs_infos.keys())
if not names and (dvc_exists or fs_exists):
# broken symlink or TreeError
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)
for name in names:
if not dvcfiles and _is_dvc_file(name):
continue
entry_path = self.path.join(path, name)
info = _merge_info(repo, fs_infos.get(name), dvc_infos.get(name))
info["name"] = entry_path
infos.append(info)
paths.append(entry_path)
if not detail:
return paths
return infos
def info(self, path, **kwargs):
key = self._get_key_from_relative(path)
ignore_subrepos = kwargs.get("ignore_subrepos", True)
return self._info(key, path, ignore_subrepos=ignore_subrepos)
def _info( # noqa: C901, PLR0912
self, key, path, ignore_subrepos=True, check_ignored=True
):
repo, dvc_fs, subkey = self._get_subrepo_info(key)
dvc_info = None
if dvc_fs:
try:
dvc_info = dvc_fs.fs.index.info(subkey)
dvc_path = _get_dvc_path(dvc_fs, subkey)
dvc_info["name"] = dvc_path
except FileNotFoundError:
pass
fs_info = None
fs = self.repo.fs
fs_path = self._from_key(key)
try:
fs_info = fs.info(fs_path)
if check_ignored and repo.dvcignore.is_ignored(
fs, fs_path, ignore_subrepos=ignore_subrepos
):
fs_info = None
except (FileNotFoundError, NotADirectoryError):
if not dvc_info:
raise
# NOTE: if some parent in fs_path turns out to be a file, it means
# that the whole repofs branch doesn't exist.
if dvc_info and not fs_info:
for parent in fs.path.parents(fs_path):
try:
if fs.info(parent)["type"] != "directory":
dvc_info = None
break
except FileNotFoundError:
continue
if not dvc_info and not fs_info:
raise FileNotFoundError
info = _merge_info(repo, fs_info, dvc_info)
info["name"] = path
return info
def get_file(self, rpath, lpath, **kwargs): # pylint: disable=arguments-differ
key = self._get_key_from_relative(rpath)
fs_path = self._from_key(key)
try:
return self.repo.fs.get_file(fs_path, lpath, **kwargs)
except FileNotFoundError:
_, dvc_fs, subkey = self._get_subrepo_info(key)
if not dvc_fs:
raise
dvc_path = _get_dvc_path(dvc_fs, subkey)
return dvc_fs.get_file(dvc_path, lpath, **kwargs)
class DVCFileSystem(FileSystem):
protocol = "local"
PARAM_CHECKSUM = "md5"
def _prepare_credentials(self, **config) -> Dict[str, Any]:
return config
@functools.cached_property
# pylint: disable-next=invalid-overridden-method
def fs(self) -> "DVCFileSystem":
return _DVCFileSystem(**self.fs_args)
def isdvc(self, path, **kwargs) -> bool:
return self.fs.isdvc(path, **kwargs)
@property
def path(self) -> Path: # pylint: disable=invalid-overridden-method
return self.fs.path
@property
def repo(self) -> "Repo":
return self.fs.repo
@property
def repo_url(self) -> str:
return self.fs.repo_url
def from_os_path(self, path: str) -> str:
if os.path.isabs(path):
path = os.path.relpath(path, self.repo.root_dir)
return as_posix(path)