-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
list: colorize output #3351
list: colorize output #3351
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import os | ||
|
||
|
||
class LsColors(object): | ||
default = "rs=0:di=01;34:ex=01;32" | ||
|
||
def __init__(self, lscolors=None): | ||
self._extensions = {} | ||
self._codes = {} | ||
self._load(lscolors or os.environ.get("LS_COLORS") or LsColors.default) | ||
|
||
def _load(self, lscolors): | ||
for item in lscolors.split(":"): | ||
try: | ||
code, color = item.split("=", 1) | ||
except ValueError: | ||
continue | ||
if code.startswith("*."): | ||
self._extensions[code[1:]] = color | ||
else: | ||
self._codes[code] = color | ||
|
||
def format(self, entry): | ||
text = entry["path"] | ||
|
||
if entry.get("isout", False) and "out" in self._codes: | ||
return self._format(text, code="out") | ||
|
||
if entry.get("isdir", False): | ||
return self._format(text, code="di") | ||
|
||
if entry.get("isexec", False): | ||
return self._format(text, code="ex") | ||
|
||
_, ext = os.path.splitext(text) | ||
return self._format(text, ext=ext) | ||
|
||
def _format(self, text, code=None, ext=None): | ||
val = None | ||
if ext: | ||
val = self._extensions.get(ext, None) | ||
if code: | ||
val = self._codes.get(code, None) | ||
|
||
if not val: | ||
return text | ||
rs = self._codes.get("rs", 0) | ||
return "\033[{}m{}\033[{}m".format(val, text, rs) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,79 +1,126 @@ | ||
import os | ||
import stat | ||
|
||
from dvc.exceptions import PathMissingError, OutputNotFoundError | ||
|
||
|
||
@staticmethod | ||
def ls(url, target=None, rev=None, recursive=None, outs_only=False): | ||
def ls( | ||
url, target=None, rev=None, recursive=None, outs_only=False, | ||
): | ||
"""Methods for getting files and outputs for the repo. | ||
|
||
Args: | ||
url (str): the repo url | ||
target (str, optional): relative path into the repo | ||
rev (str, optional): SHA commit, branch or tag name | ||
recursive (bool, optional): recursively walk the repo | ||
outs_only (bool, optional): show only DVC-artifacts | ||
|
||
Returns: | ||
list of `entry` | ||
|
||
Notes: | ||
`entry` is a dictionary with structure | ||
{ | ||
"path": str, | ||
"isout": bool, | ||
"isdir": bool, | ||
"isexec": bool, | ||
} | ||
""" | ||
from dvc.external_repo import external_repo | ||
from dvc.repo import Repo | ||
from dvc.utils import relpath | ||
|
||
with external_repo(url, rev) as repo: | ||
target_path_info = _get_target_path_info(repo, target) | ||
result = [] | ||
fs_nodes = [] | ||
if isinstance(repo, Repo): | ||
result.extend(_ls_outs_repo(repo, target_path_info, recursive)) | ||
fs_nodes.extend(_ls_outs_repo(repo, target_path_info, recursive)) | ||
|
||
if not outs_only: | ||
result.extend(_ls_files_repo(target_path_info, recursive)) | ||
fs_nodes.extend(_ls_files_repo(target_path_info, recursive)) | ||
|
||
if target and not result: | ||
if target and not fs_nodes: | ||
raise PathMissingError(target, repo, output_only=outs_only) | ||
Suor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def prettify(path_info): | ||
if path_info == target_path_info: | ||
return path_info.name | ||
return relpath(path_info, target_path_info) | ||
fs_nodes = {n["path_info"]: n for n in fs_nodes}.values() | ||
|
||
result = list(set(map(prettify, result))) | ||
result.sort() | ||
return result | ||
def get_entry(fs_node): | ||
path_info = fs_node["path_info"] | ||
path = ( | ||
path_info.name | ||
if path_info == target_path_info | ||
else relpath(path_info, target_path_info) | ||
) | ||
return { | ||
"path": path, | ||
"isout": fs_node.get("isout", False), | ||
"isdir": fs_node.get("isdir", False), | ||
"isexec": fs_node.get("isexec", False), | ||
} | ||
|
||
entries = sorted(map(get_entry, fs_nodes), key=lambda f: f["path"]) | ||
return entries | ||
|
||
def _ls_files_repo(target_path_info, recursive=None): | ||
|
||
def _ls_files_repo(path_info, recursive=None): | ||
from dvc.compat import fspath | ||
from dvc.ignore import CleanTree | ||
from dvc.path_info import PathInfo | ||
from dvc.scm.tree import WorkingTree | ||
|
||
if not os.path.exists(fspath(target_path_info)): | ||
if not os.path.exists(fspath(path_info)): | ||
return [] | ||
|
||
files = [] | ||
tree = CleanTree(WorkingTree(target_path_info)) | ||
tree = CleanTree(WorkingTree(path_info)) | ||
Suor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
for dirpath, dirnames, filenames in tree.walk(target_path_info): | ||
files.extend(map(lambda f: PathInfo(dirpath, f), filenames)) | ||
for dirpath, dirnames, filenames in tree.walk(path_info): | ||
files.extend(PathInfo(dirpath, f) for f in filenames) | ||
if not recursive: | ||
files.extend(map(lambda d: PathInfo(dirpath, d), dirnames)) | ||
files.extend(PathInfo(dirpath, d) for d in dirnames) | ||
break | ||
except NotADirectoryError: | ||
if os.path.isfile(fspath(target_path_info)): | ||
return [target_path_info] | ||
if os.path.isfile(fspath(path_info)): | ||
files = [path_info] | ||
|
||
return files | ||
return [_get_fs_node(f) for f in files] | ||
|
||
|
||
def _ls_outs_repo(repo, target_path_info, recursive=None): | ||
def _ls_outs_repo(repo, path_info, recursive=None): | ||
from dvc.compat import fspath | ||
from dvc.path_info import PathInfo | ||
|
||
try: | ||
outs = repo.find_outs_by_path(fspath(target_path_info), recursive=True) | ||
outs = repo.find_outs_by_path(fspath(path_info), recursive=True) | ||
except OutputNotFoundError: | ||
return [] | ||
|
||
if recursive: | ||
return [out.path_info for out in outs] | ||
|
||
def get_top_part(path_info): | ||
relpath = path_info.relpath(target_path_info) | ||
return [_get_fs_node(out.path_info, out) for out in outs] | ||
|
||
def get_first_segment(out): | ||
"""Returns tuple with path_info and related out | ||
|
||
path_info calculated as the first relpath segment | ||
Example: | ||
dir/file -> dir | ||
dir/subdir/file -> dir | ||
file -> file | ||
""" | ||
relpath = out.path_info.relpath(path_info) | ||
if relpath.parts: | ||
return PathInfo(target_path_info, relpath.parts[0]) | ||
return path_info | ||
out_path_info = PathInfo(path_info, relpath.parts[0]) | ||
isout = len(relpath.parts) == 1 | ||
return (out_path_info, out if isout else None) | ||
return (out.path_info, out) | ||
|
||
return list({get_top_part(out.path_info) for out in outs}) | ||
return [ | ||
_get_fs_node(p, out) | ||
for (p, out) in {get_first_segment(out) for out in outs} | ||
] | ||
|
||
|
||
def _get_target_path_info(repo, target=None): | ||
|
@@ -82,3 +129,26 @@ def _get_target_path_info(repo, target=None): | |
if not target: | ||
return PathInfo(repo.root_dir) | ||
return PathInfo(repo.root_dir, target) | ||
|
||
|
||
def _get_fs_node(path_info, out=None): | ||
Suor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from dvc.compat import fspath | ||
|
||
if out: | ||
isdir = out.is_dir_checksum if out.checksum else False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is no checksum it might still be dir, I would write a note here about this edge case. Now the code looks quirky for no apparent reason. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I got if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it fails, we don't really know whether that is a dir. We may just say about this limitation in a comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In what case the out can be dir but doesn't have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Suor it was an open question - could you please provide the case when it can happen? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a user edits a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
isexec = False | ||
else: | ||
try: | ||
isdir = os.path.isdir(fspath(path_info)) | ||
mode = os.stat(fspath(path_info)).st_mode | ||
isexec = mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) | ||
except FileNotFoundError: | ||
isdir = False | ||
isexec = False | ||
|
||
return { | ||
"path_info": path_info, | ||
"isout": bool(out), | ||
"isdir": isdir, | ||
"isexec": isexec, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, maybe, even use a lambda?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've remembered why I did it as it is
with lambda there is an E371 style error
Unfortunately with you suggestion I am getting F811
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JIoJIaJIu, It's fine to ignore redefinition warnings (it's what I suggested). But, I'm fine as-is, it's only a suggestion.