generated from datalad/datalad-extension-template
-
Notifications
You must be signed in to change notification settings - Fork 10
/
tree.py
1263 lines (1043 loc) · 45.5 KB
/
tree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See LICENSE file distributed along with the datalad_osf package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""'tree'-like command for visualizing dataset hierarchies"""
__docformat__ = "numpy"
import logging
from functools import wraps, lru_cache
from os import readlink
from pathlib import Path
from datalad.interface.base import (
Interface,
build_doc,
)
from datalad.support.exceptions import (
CapturedException,
NoDatasetFound
)
from datalad.support.param import Parameter
from datalad.distribution.dataset import (
datasetmethod,
require_dataset,
Dataset,
)
from datalad.interface.results import (
get_status_dict,
)
from datalad.interface.utils import eval_results
from datalad.local.subdatasets import Subdatasets
from datalad.support.constraints import (
EnsureNone,
EnsureStr,
EnsureInt,
EnsureRange,
)
from datalad.utils import get_dataset_root
from datalad.ui import ui
lgr = logging.getLogger('datalad.local.tree')
@build_doc
class TreeCommand(Interface):
"""Visualize directory and dataset hierarchies
This command mimics the UNIX/MSDOS 'tree' utility to generate and
display a directory tree, with DataLad-specific enhancements.
It can serve the following purposes:
1. Glorified 'tree' command
2. Dataset discovery
3. Programmatic directory traversal
*Glorified 'tree' command*
The rendered command output uses 'tree'-style visualization::
/tmp/mydir
├── [DS~0] ds_A/
│ └── [DS~1] subds_A/
└── [DS~0] ds_B/
├── dir_B/
│ ├── file.txt
│ ├── subdir_B/
│ └── [DS~1] subds_B0/
└── [DS~1] (not installed) subds_B1/
5 datasets, 2 directories, 1 file
Dataset paths are prefixed by a marker indicating subdataset hierarchy
level, like ``[DS~1]``.
This is the absolute subdataset level, meaning it may also take into
account superdatasets located above the tree root and thus not included
in the output.
If a subdataset is registered but not installed (such as after a
non-recursive ``datalad clone``), it will be prefixed by ``(not
installed)``. Only DataLad datasets are considered, not pure
git/git-annex repositories.
The 'report line' at the bottom of the output shows the count of
displayed datasets, in addition to the count of directories and
files. In this context, datasets and directories are mutually
exclusive categories.
By default, only directories (no files) are included in the tree,
and hidden directories are skipped. Both behaviours can be changed
using command options.
Symbolic links are always followed.
This means that a symlink pointing to a directory is traversed and
counted as a directory (unless it potentially creates a loop in
the tree).
*Dataset discovery*
Using the [CMD: ``--dataset-depth`` CMD][PY: ``dataset_depth`` PY]
option, this command generates the layout of dataset hierarchies based on
subdataset nesting level, regardless of their location in the
filesystem.
In this case, tree depth is determined by subdataset depth. This mode
is therefore suited for discovering available datasets when their
location is not known in advance.
By default, only datasets are listed, without their contents. If
[CMD: ``--depth`` CMD][PY: ``depth`` PY] is specified additionally,
the contents of each dataset will be included up to [CMD:
``--depth`` CMD][PY: ``depth`` PY] directory levels.
Tree filtering options such as [CMD: ``--include-hidden`` CMD][PY:
``include_hidden`` PY] only affect which directories are
reported/displayed, not which directories are traversed to find datasets.
*Programmatic directory traversal*
The command yields a result record for each tree node (dataset,
directory or file). The following properties are reported, where available:
"path"
Absolute path of the tree node
"type"
Type of tree node: "dataset", "directory" or "file"
"depth"
Directory depth of node relative to the tree root
"exhausted_levels"
Depth levels for which no nodes are left to be generated (the
respective subtrees have been 'exhausted')
"count"
Dict with cumulative counts of datasets, directories and files in the
tree up until the current node. File count is only included if the
command is run with the [CMD: ``--include-files`` CMD][PY:
``include_files`` PY]
option.
"dataset_depth"
Subdataset depth level relative to the tree root. Only included for
node type "dataset".
"dataset_abs_depth"
Absolute subdataset depth level. Only included for node type "dataset".
"dataset_is_installed"
Whether the registered subdataset is installed. Only included for node
type "dataset".
"symlink_target"
If the tree node is a symlink, the path to the link target
"is_broken_symlink"
If the tree node is a symlink, whether it is a broken symlink
"""
result_renderer = 'tailored'
_params_ = dict(
path=Parameter(
args=("path",),
nargs='?',
doc="""path to directory from which to generate the tree.
Defaults to the current directory.""",
constraints=EnsureStr() | EnsureNone()),
depth=Parameter(
args=("--depth",),
doc="""maximum level of subdirectories to include in the tree.
If not specified, will generate the full tree with no depth
constraint.
If paired with [CMD: ``--dataset-depth`` CMD][PY:
``dataset_depth`` PY], refers to the maximum directory level to
generate underneath each dataset.""",
constraints=EnsureInt() & EnsureRange(min=0) | EnsureNone()),
dataset_depth=Parameter(
args=("--dataset-depth",),
doc="""maximum level of nested subdatasets to include in the
tree. 0 means only top-level datasets, 1 means top-level
datasets and their immediate subdatasets, etc.""",
constraints=EnsureInt() & EnsureRange(min=0) | EnsureNone()),
include_files=Parameter(
args=("--include-files",),
doc="""include files in the tree""",
action='store_true'),
include_hidden=Parameter(
args=("--include-hidden",),
doc="""include hidden files/directories in the tree. This
option does not affect which directories will be searched for
datasets when specifying [CMD: ``--dataset-depth`` CMD][PY:
``dataset_depth`` PY]. For example, datasets located underneath
the hidden folder `.datalad` will be reported even if [CMD:
``--include-hidden`` CMD][PY: ``include_hidden`` PY] is omitted.""",
action='store_true'),
)
_examples_ = [
dict(text="Show up to 3 levels of subdirectories below the current "
"directory, including files and hidden contents",
code_py="tree(depth=3, include_files=True, include_hidden=True)",
code_cmd="datalad tree --depth 3 --include-files --include-hidden"),
dict(text="Find all top-level datasets located anywhere under ``/tmp``",
code_py="tree('/tmp', dataset_depth=0)",
code_cmd="datalad tree /tmp --dataset-depth 0"),
dict(text="Report first- and second-level subdatasets and their "
"directory contents, up to 1 subdirectory deep within each "
"dataset",
code_py="tree(dataset_depth=2, depth=1)",
code_cmd="datalad tree --dataset-depth 2 --depth 1"),
]
@staticmethod
@datasetmethod(name='tree')
@eval_results
def __call__(
path='.',
*,
depth=None,
dataset_depth=None,
include_files=False,
include_hidden=False):
if dataset_depth is not None:
# special tree defined by subdataset nesting depth
tree_cls = DatasetTree
dataset_tree_args = {"max_dataset_depth": dataset_depth}
else:
# simple tree defined by directory depth
tree_cls = Tree
dataset_tree_args = {}
tree = tree_cls(
Path(path),
max_depth=depth,
exclude_node_func=build_excluded_node_func(
include_hidden=include_hidden, include_files=include_files),
**dataset_tree_args
)
for node in tree.generate_nodes():
# yield one node at a time to improve UX / perceived speed
res_dict = {
"action": "tree",
"path": str(node.path),
"type": node.TYPE,
"depth": node.depth,
"exhausted_levels": list(tree.exhausted_levels),
"count": {
"datasets": tree.node_count["DatasetNode"],
"directories": tree.node_count["DirectoryNode"],
**({"files": tree.node_count["FileNode"]}
if include_files else {})
},
}
if node.TYPE == "dataset":
res_dict.update({
"dataset_depth": node.ds_depth,
"dataset_abs_depth": node.ds_absolute_depth,
"dataset_is_installed": node.is_installed
})
if node.is_symlink():
# TODO: should we inform if the symlink is recursive (as per
# `tree.is_recursive_symlink()`) although not broken? The
# UNIX 'tree' command shows the message '[recursive,
# not followed]' next to the path. Not sure if this is
# interesting at all or more confusing.
res_dict["symlink_target"] = node.get_symlink_target()
res_dict["is_broken_symlink"] = node.is_broken_symlink()
if node.exception is not None:
# mimic error message of unix 'tree' command for
# permission denied error, otherwise use exception short
# message
message = "error opening dir" \
if node.exception.name == "PermissionError" \
else node.exception.message
yield get_status_dict(
status="error",
message=message,
exception=node.exception,
**res_dict
)
else:
yield get_status_dict(
status="ok",
**res_dict
)
@staticmethod
def custom_result_renderer(res, **kwargs):
"""
Each node is printed on one line. The string uses the format:
``[<indentation>] [<branch_tip_symbol>] <path> [<ds_marker]``
Example line:
``│ │ ├── path_dir_level3``
"""
from datalad.support import ansi_colors
# get values from result record
node_type = res["type"]
node_path = res["path"]
depth = res["depth"]
exhausted_levels = res["exhausted_levels"]
# build indentation string
indentation = ""
if depth > 0:
indentation_symbols_for_levels = [
("│"
if level not in exhausted_levels
else " ") + " "
for level in range(1, depth)
]
indentation = "".join(indentation_symbols_for_levels)
# build prefix (tree branch tip)
prefix = ""
if depth > 0: # root node has no prefix
is_last_child = depth in exhausted_levels
prefix = "└──" if is_last_child else "├──"
# build dataset marker if dataset
ds_marker = ""
if node_type == "dataset":
ds_absolute_depth = res["dataset_abs_depth"]
ds_is_installed = res["dataset_is_installed"]
ds_marker_depth = ansi_colors.color_word(
f"DS~{ds_absolute_depth}",
ansi_colors.WHITE)
install_flag = " (not installed)" if not ds_is_installed else ""
ds_marker = f"[{ds_marker_depth}]" + install_flag
# build path string with optional color
# display only root directory with full path, all other nodes
# with basename
path = node_path if depth == 0 else Path(node_path).name
color_for_type = {
"dataset": ansi_colors.MAGENTA,
"directory": ansi_colors.BLUE,
"file": None,
"broken_symlink": ansi_colors.RED
}
# ANSI color for the path, if terminal colors are enabled
color = color_for_type[node_type]
if color is not None:
path = ansi_colors.color_word(path, color)
if res.get("is_broken_symlink", False):
path = ansi_colors.color_word(path,
color_for_type["broken_symlink"])
# set suffix for directories
dir_suffix = ""
if depth > 0 and node_type in ("directory", "dataset"):
dir_suffix = "/"
# append symlink target if symlink
symlink_target = ""
if "symlink_target" in res:
symlink_target = " -> " + res["symlink_target"]
# add short error message if there was exception
error_msg = ""
if "exception" in res:
error_msg = f" [{res['message']}]"
line = indentation + \
" ".join((s for s in (prefix, ds_marker, path) if s != "")) + \
dir_suffix + symlink_target + error_msg
ui.message(line)
@staticmethod
def custom_result_summary_renderer(res, **kwargs):
"""Print the summary 'report line' with count of nodes by type"""
c_ds = res[-1]['count']['datasets']
c_dirs = res[-1]['count']['directories']
# files may not be included in results (if not using command
# option '--include-files')
c_files = res[-1]['count'].get('files')
descriptions = [
f"{c_ds} " + ("dataset" if int(c_ds) == 1 else "datasets"),
f"{c_dirs} " + ("directory" if int(c_dirs) == 1 else "directories")
]
if c_files is not None:
descriptions.append(
f"{c_files} " + ("file" if int(c_files) == 1 else "files"))
ui.message("\n" + ", ".join(descriptions))
def build_excluded_node_func(include_hidden=False, include_files=False):
"""Return a function to exclude ``_TreeNode`` objects from the tree
(prevents them from being yielded by the node generator).
Returns
-------
Callable
Function that takes the Path object of a ``_TreeNode`` as input,
and returns true if the node should *not* be displayed in the tree.
"""
def is_excluded(node: _TreeNode):
return any((
isinstance(node, FileNode) if not include_files else False,
node.path.name.startswith(".") if not include_hidden else False
))
return is_excluded
def increment_node_count(node_generator_func):
"""Decorator for incrementing the node count whenever a ``_TreeNode`` is
yielded.
Parameters
----------
node_generator_func: Callable
Function that yields ``_TreeNode`` objects
"""
@wraps(node_generator_func)
def _wrapper(*args, **kwargs):
self = args[0] # 'self' is a Tree instance
for node in node_generator_func(*args, **kwargs):
node_type = node.__class__.__name__
if node_type not in self.node_count:
raise ValueError(
f"No counts collected for unknown node type '{node_type}'"
)
if node.depth > 0: # do not count the root directory
# TODO: do not count symlinks if they point to
# files/directories that are already included in the tree
# (to prevent double counting)? Note that UNIX 'tree' does
# count double.
self.node_count[node_type] += 1
yield node # yield what the generator yielded
return _wrapper
def yield_with_last_item(generator):
"""Takes a generator and yields for each item, the item itself and
whether it is the last item in the sequence.
Returns
-------
Tuple[bool, Any]
A tuple (is_last_item, item)
"""
prev_val = next(generator, None)
if prev_val is not None:
for current_val in generator:
yield False, prev_val
prev_val = current_val
yield True, prev_val
def path_depth(path: Path, root: Path):
"""Calculate directory depth of a path relative to the given root.
Can also be a negative integer if the path is a parent of the
tree root.
Returns
-------
int
Number of levels of the given path *below* the root (positive
integer) or *above* the tree root (negative integer)
Raises
------
ValueError
Like ``path.relative_to()``, raises ``ValueError`` if the path is not
relative to the root
"""
sign = 1
try:
rpath = path.relative_to(root)
except ValueError:
try:
rpath = root.relative_to(path)
sign = -1
except ValueError:
raise ValueError(
"Could not calculate directory depth: "
f"'{path}' is not relative to the tree root "
f"'{root}' (or vice-versa)")
return sign * len(rpath.parts)
def is_empty_dir(path: Path):
"""Does not check that path is a directory (to avoid extra
system calls)"""
return not any(path.iterdir())
@lru_cache
def is_dataset(path: Path, installed_only=False):
"""Fast dataset detection.
Infer that a directory is a dataset if it is either:
- installed, or
- not installed, but has an installed superdatset (only if argument
``installed_only`` is False)
Only consider datalad datasets, not plain git/git-annex repos.
Symlinks pointing to datasets are not resolved, so will always return
False for symlinks. This prevents potentially detecting duplicate datasets
if the symlink and its target are both included in the tree.
Results are cached because the check is somewhat expensive and may
be run multiple times on the same path.
Parameters
----------
path: Path
Path to directory to be identified as dataset or non-dataset
installed_only: bool
Whether to ignore datasets that are not installed
"""
try:
if path.is_symlink():
# ignore symlinks even if pointing to datasets, otherwise we may
# get duplicate counts of datasets
lgr.debug("Path is a symlink, will not check if it points to a "
f"dataset: '{path}'")
return False
if (path / ".datalad" / "config").is_file():
# could also query `ds.id`, but checking just for existence
# of config file is quicker.
return True
# if it is not installed, check if it has an installed superdataset.
# instead of querying ds.is_installed() (which checks if the
# directory has the .git folder), we check if the directory
# is empty (faster) -- as e.g. after a non-recursive `datalad clone`
if not installed_only:
if is_empty_dir(path):
return get_superdataset(path) is not None
except Exception as ex:
# if anything fails (e.g. permission denied), we raise exception
# instead of returning False. this can be caught and handled by the
# caller.
raise NoDatasetFound(f"Cannot determine if '{path.name}' is a "
f"dataset") from ex
return False
@lru_cache
def get_subds_paths(ds_path: Path):
"""Return paths of immediate subdatasets for a given dataset path."""
# This is an expensive operation because it calls git to read the
# submodules. Since we need to run it to (A) calculate dataset depth and
# (B) detect non-installed datasets, we cache results, so that the list of
# subdatasets is computed only once for each parent dataset.
def res_filter(res):
return res.get('status') == 'ok' and res.get('type') == 'dataset'
# call subdatasets command instead of dataset method `ds.subdatasets()`
# to avoid potentially expensive import of full datalad API
return Subdatasets.__call__(
dataset=ds_path,
recursive=False,
state='any', # include not-installed subdatasets
result_filter=res_filter,
on_failure='ignore',
result_xfm='paths',
result_renderer='disabled',
return_type='list'
)
@lru_cache
def get_dataset_root_datalad_only(path: Path):
"""Get root of dataset containing a given path (datalad datasets only,
not pure git/git-annex repo)
Parameters
----------
path: Path
Path to file or directory
Returns
-------
Path
"""
ds_root = path
while ds_root:
potential_ds_root = get_dataset_root(str(ds_root))
if potential_ds_root is None:
return None # we are not inside a dataset
potential_ds_root = Path(potential_ds_root)
if is_dataset(potential_ds_root, installed_only=True):
return potential_ds_root # it's a match
# we go one directory higher and try again
ds_root = (potential_ds_root / "..").resolve(strict=True)
return ds_root
@lru_cache
def get_superdataset(path: Path):
"""Reimplementation of ``Dataset.get_superdataset()`` to allow caching
results of `ds.subdatasets()` (the most expensive operation).
Parameters
----------
path: Path
Path to a dataset
Returns
-------
Dataset or None
"""
superds_path = None
while path:
parent_path = (path / "..").resolve(strict=True)
sds_path_ = get_dataset_root_datalad_only(parent_path)
if sds_path_ is None:
# no more parents, use previous found
break
superds = Dataset(sds_path_)
# test if path is registered subdataset of the parent
if not str(path) in get_subds_paths(superds.pathobj):
break
# That was a good candidate
superds_path = sds_path_
path = parent_path
break
if superds_path is None:
# None was found
return None
return Dataset(superds_path)
def is_path_relative_to(my_path: Path, other_path: Path):
"""Port of pathlib's ``Path.is_relative_to()`` (requires python3.9+)"""
try:
my_path.relative_to(other_path)
return True
except ValueError:
return False
class Tree:
"""Main class for generating and serializing a directory tree"""
def __init__(self,
root: Path,
max_depth=None,
exclude_node_func=None):
"""
Parameters
----------
root: Path
Directory to be used as tree root
max_depth: int or None
Maximum directory depth for traversing the tree
exclude_node_func: Callable or None
Function to filter out tree nodes from the tree
"""
self.root = root.resolve(strict=False)
try:
assert self.root.is_dir(), f"path is not a directory: {self.root}"
except (AssertionError, OSError) as ex: # could be permission error
raise ValueError(f"directory not found: '{root}'") from ex
self.max_depth = max_depth
if max_depth is not None and max_depth < 0:
raise ValueError("max_depth must be >= 0")
# set callable to exclude nodes from the tree, meaning they
# will not be yielded by the node generator
self.exclude_node_func = exclude_node_func or self.default_exclude_func
# keep track of levels where the subtree is exhausted, i.e. we
# have reached the last node of the current subtree.
# this is needed for the custom results renderer, to display nodes
# differently depending on whether they are the last child or not.
self.exhausted_levels = set([])
# store dict with count of nodes for each node type, similar to the
# tree command's 'report line' at the end of the output.
# the node types (subclasses of ``_TreeNode``) are mutually exclusive,
# so the sum of their counts equals to the total node count.
# does not count the root itself, only the contents below the root.
self.node_count = {node_type.__name__: 0
for node_type in _TreeNode.__subclasses__()}
@staticmethod
def default_exclude_func(node):
"""By default, exclude files and hidden directories from the tree"""
return any(
(isinstance(node, FileNode), node.path.name.startswith("."))
)
def path_depth(self, path: Path):
return path_depth(path, self.root)
def _generate_tree_nodes(self, dir_path: Path):
"""Recursively yield ``_TreeNode`` objects starting from ``dir_path``
Parameters
----------
dir_path: Path
Directory from which to calculate the tree
"""
# yield current directory/dataset node
current_depth = self.path_depth(dir_path)
current_node = Node(dir_path, current_depth)
yield current_node
# check that we are within max_depth levels
# (None means unlimited depth)
if self.max_depth is None or \
current_depth < self.max_depth:
if current_node.is_symlink() and \
current_node.is_recursive_symlink(self.max_depth):
# if symlink points to directory that we may visit or may
# have visited already, do not recurse into it
lgr.debug(f"Symlink is potentially recursive, "
f"will not traverse target directory: '{dir_path}'")
return
if current_node.exception is not None:
# if some exception occurred when instantiating the node
# (missing permissions etc), do not recurse into directory
lgr.debug("Node has exception, will not traverse directory: "
f"path={current_node.path}, exc={current_node.exception}")
return
# sort child nodes alphabetically
# needs to be done *before* calling the exclusion function,
# because the function may depend on sort order
all_children = sorted(list(dir_path.iterdir()))
child_depth = current_depth + 1
# generator to apply exclusion filter
def children():
for child_path in all_children:
child_node = Node(child_path, child_depth)
if not self.exclude_node_func(child_node):
yield child_node
# exclusion function could be expensive to compute, so we
# use a generator for child nodes. however, we need to be able
# to detect the last child node within each subtree (needed for
# displaying special end-of-subtree prefix). so we wrap the
# generator in another 'lookahead' generator to detect the last
# item.
for is_last_child, child in yield_with_last_item(children()):
if is_last_child: # last child of its subtree
self.exhausted_levels.add(child_depth)
else:
self.exhausted_levels.discard(child_depth)
# remove exhausted levels that are deeper than the
# current depth (we don't need them anymore)
levels = set(self.exhausted_levels) # copy
self.exhausted_levels.difference_update(
l for l in levels if l > child_depth
)
if isinstance(child, (DirectoryNode, DatasetNode)):
# recurse into subdirectories
yield from self._generate_tree_nodes(child.path)
else:
# it's a file, just yield it
yield child
@increment_node_count
def generate_nodes(self):
"""
Traverse a directory tree starting from the root path.
Yields ``_TreeNode`` objects, each representing a directory or
dataset or file. Nodes are traversed in depth-first order.
Returns
-------
Generator[_TreeNode]
"""
# because the node generator is recursive, we cannot directly
# decorate it with `increment_node_count` (since it would count
# twice whenever the function recurses).
# so we decorate a separate function where we just yield from the
# underlying generator.
yield from self._generate_tree_nodes(self.root)
class DatasetTree(Tree):
"""
``DatasetTree`` is a ``Tree`` whose depth is determined by the
subdataset hierarchy level, instead of directory depth.
Because of the different semantics of the ``max_depth`` parameter,
we implement a separate subclass of ``Tree``.
"""
def __init__(self, *args, max_dataset_depth=0, **kwargs):
super().__init__(*args, **kwargs)
# by default, do not recurse into datasets' subdirectories (other
# than paths to nested subdatasets)
if self.max_depth is None:
self.max_depth = 0
self.max_dataset_depth = max_dataset_depth
# secondary 'helper' generator that will traverse the whole tree
# (once) and yield only datasets and their parents directories
self._ds_generator = self._generate_datasets()
# keep track of node paths that have been yielded
self._visited = set([])
# current value of the ds_generator. the generator will be initialized
# lazily, so for now we set the value to a dummy `_TreeNode`
# with an impossible depth just to distinguish it from None
# (None means the generator has finished).
self._next_ds = _TreeNode(self.root, None)
@increment_node_count
def generate_nodes(self):
"""
Yield ``_TreeNode`` objects that belong to the tree.
A ``DatasetTree`` is just an unlimited-depth ``Tree`` with more
complex rules for pruning (skipping traversal of particular nodes).
Each exclusion rule is encoded in a function. The rules are then
combined in a final ``exclusion_func`` which is supplied to the
``Tree`` constructor.
Returns
-------
Generator[_TreeNode]
"""
def exclude_func(node: _TreeNode):
"""Exclusion function -- here is the crux of the logic for
pruning the main tree."""
try:
# initialize dataset(-parent) generator if not done yet
if self._next_ds is not None and \
self._next_ds.depth is None: # dummy depth
self._advance_ds_generator()
if isinstance(node, DatasetNode):
# check if maximum dataset depth is exceeded
is_valid_ds = not self.exclude_node_func(node) and \
node.ds_depth <= self.max_dataset_depth
if is_valid_ds:
self._advance_ds_generator() # go to next dataset(-parent)
return not is_valid_ds
# exclude file or directory underneath a dataset,
# if it has depth (relative to dataset root) > max_depth,
# unless (in case of a directory) it is itself the parent of a
# valid dataset. if it's a parent of a dataset, we don't apply
# any filters -- it's just a means to get to the next dataset.
if not self._is_parent_of_ds(node):
return self.exclude_node_func(node) or \
self._ds_child_node_exceeds_max_depth(node)
except Exception as ex:
CapturedException(ex, level=10) # DEBUG level
lgr.debug(f"Excluding node from tree because "
"an exception occurred while applying the "
f"exclusion filter: '{node.path}'")
return True # exclude by default
return False # do not exclude
tree = Tree(
self.root,
max_depth=None, # unlimited traversal (datasets could be anywhere)
exclude_node_func=exclude_func,
)
# synchronize exhausted levels with the main tree
self.exhausted_levels = tree.exhausted_levels
yield from tree.generate_nodes()
def _advance_ds_generator(self):
"""Go to the next dataset or parent of dataset"""
self._next_ds = next(self._ds_generator, None)
if self._next_ds is not None:
lgr.debug(
f"Next dataset" +
(" parent" if isinstance(self._next_ds, DirectoryNode) else "")
+ f": {self._next_ds.path}")
def _generate_datasets(self):
"""Generator of dataset nodes and their parent directories starting
from below the tree root and up to ``max_dataset_depth`` levels.
This secondary 'helper' tree will be generated in parallel with the
main tree but will be one step ahead, such that it always points to
the next dataset (or dataset parent) relative to the current node in
the main tree.
We can use it to look into downstream/future nodes and decide
efficiently whether to prune the current node in the main tree.
Returns
-------
Generator[DirectoryNode or DatasetNode]
"""
def exclude(n: _TreeNode):
# we won't find any datasets underneath the git folder
return isinstance(n, FileNode) or \
(isinstance(n, DirectoryNode) and n.path.name == ".git")
ds_tree = Tree(
self.root,
max_depth=None,
exclude_node_func=exclude,
)
nodes_below_root = ds_tree.generate_nodes()
next(nodes_below_root) # skip root node
for node in nodes_below_root:
# for each dataset node, yield its parents first, then
# yield the dataset itself
if isinstance(node, DatasetNode) and \
node.ds_depth <= self.max_dataset_depth and \
not self.exclude_node_func(node):
# yield parent directories if not already done
parents_below_root = node.parents[1:] # first parent is root
for par_depth, par_path in enumerate(parents_below_root):
parent = Node(par_path, par_depth)
if parent not in self._visited:
self._visited.add(parent)
yield parent
self._visited.add(node)
yield node
def _ds_child_node_exceeds_max_depth(self, ds_node):
ds_parent_path = get_dataset_root_datalad_only(ds_node.path)
if ds_parent_path is None:
# it's not a dataset's child, so exclude
return True
if ds_parent_path == self.root:
ds_parent_depth = 0
else:
ds_parent = next((node for node in self._visited
if node.path == ds_parent_path), None)
if ds_parent is None:
# parent is not part of the tree, so exclude child
return True
ds_parent_depth = ds_parent.depth
# check directory depth relative to the dataset parent
rel_depth = ds_node.depth - ds_parent_depth
return rel_depth > self.max_depth
def _is_parent_of_ds(self, node):
if self._next_ds is None:
return False # no more datasets, can't be a parent
if self._next_ds.path == node.path:
# we hit a dataset or the parent of a dataset
self._advance_ds_generator()
return True