Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Showcase] dvc: use dynamic scope to stop callback/flag passing #2957

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from voluptuous import Any

import dvc.prompt as prompt
from dvc.progress import flags, noop
from dvc.cache import NamedCache
from dvc.exceptions import CollectCacheError
from dvc.exceptions import DvcException
Expand Down Expand Up @@ -282,12 +283,10 @@ def verify_metric(self):
def download(self, to):
self.remote.download(self.path_info, to.path_info)

def checkout(
self, force=False, progress_callback=None, tag=None, relink=False
):
def checkout(self, force=False, tag=None, relink=False):
if not self.use_cache:
if progress_callback:
progress_callback(str(self.path_info), self.get_files_number())
pbar = flags.tqdm or noop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would make sense to just have pbar as a part of flags or some other structure instead of doing flags.tqdm or noop everywhere?

pbar.update_desc(str(self), self.get_files_number())
return None

if tag:
Expand All @@ -296,11 +295,7 @@ def checkout(
info = self.info

return self.cache.checkout(
self.path_info,
info,
force=force,
progress_callback=progress_callback,
relink=relink,
self.path_info, info, force=force, relink=relink
)

def remove(self, ignore_remove=False):
Expand Down
40 changes: 40 additions & 0 deletions dvc/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def __init__(
if file is None:
file = sys.stderr
self.desc_persist = desc
# disable by context flag
if flags.tqdm_disable:
disable = True
# auto-disable based on `logger.level`
if disable is None:
disable = logger.getEffectiveLevel() > level
Expand Down Expand Up @@ -131,3 +134,40 @@ def format_dict(self):
d["ncols_desc"] = 1
d["prefix"] = ""
return d


# Adding this here for showcase only
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

from contextlib import contextmanager # noqa


class ContextFlags:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Suor , what about using this instead?

from contextlib import contextmanager
from collections import UserDict


class ContextFlags(UserDict):
    def __getattr__(self, name):
        return self.data.get(name)

    @contextmanager
    def __call__(self, **flags):
        self.data.update(flags)

        yield

        for key in flags.keys():
            del self.data[key]

def __init__(self):
self._stack = []

def __call__(self, **values):
return _flags(self._stack, values)

def __getattr__(self, name):
for d in reversed(self._stack):
if name in d:
return d[name]
else:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this indented correctly?

return None


flags = ContextFlags()


@contextmanager
def _flags(stack, values):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah, this is clever 👌

stack.append(values)
yield
stack.pop()


class Noop:
def __getattr__(self, name):
return lambda self, *a, **kw: None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as doing?

def __getattr__(self, name):
    pass



noop = Noop()
12 changes: 4 additions & 8 deletions dvc/remote/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,17 @@ def _list_paths(self, bucket, prefix):
def list_cache_paths(self):
return self._list_paths(self.path_info.bucket, self.path_info.path)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
def _upload(self, from_file, to_info, name=None, **_kwargs):
with Tqdm(desc=name, bytes=True) as pbar:
self.blob_service.create_blob_from_path(
to_info.bucket,
to_info.path,
from_file,
progress_callback=pbar.update_to,
)

def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
def _download(self, from_info, to_file, name=None, **_kwargs):
with Tqdm(desc=name, bytes=True) as pbar:
self.blob_service.get_blob_to_path(
from_info.bucket,
from_info.path,
Expand Down
91 changes: 24 additions & 67 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from dvc.ignore import DvcIgnore
from dvc.path_info import PathInfo, URLInfo
from dvc.progress import Tqdm
from dvc.progress import Tqdm, flags, noop
from dvc.remote.slow_link_detection import slow_link_guard
from dvc.state import StateNoop
from dvc.utils import makedirs, relpath, tmp_fname
Expand Down Expand Up @@ -238,7 +238,8 @@ def _get_dir_info_checksum(self, dir_info):

from_info = PathInfo(tmp)
to_info = self.cache.path_info / tmp_fname("")
self.cache.upload(from_info, to_info, no_progress_bar=True)
with flags(tqdm_disable=True):
self.cache.upload(from_info, to_info)

checksum = self.get_file_checksum(to_info) + self.CHECKSUM_DIR_SUFFIX
return checksum, to_info
Expand Down Expand Up @@ -516,7 +517,7 @@ def _save(self, path_info, checksum):
return
self._save_file(path_info, checksum)

def upload(self, from_info, to_info, name=None, no_progress_bar=False):
def upload(self, from_info, to_info, name=None):
if not hasattr(self, "_upload"):
raise RemoteActionNotImplemented("upload", self.scheme)

Expand All @@ -531,12 +532,7 @@ def upload(self, from_info, to_info, name=None, no_progress_bar=False):
name = name or from_info.name

try:
self._upload(
from_info.fspath,
to_info,
name=name,
no_progress_bar=no_progress_bar,
)
self._upload(from_info.fspath, to_info, name=name)
except Exception:
msg = "failed to upload '{}' to '{}'"
logger.exception(msg.format(from_info, to_info))
Expand All @@ -545,13 +541,7 @@ def upload(self, from_info, to_info, name=None, no_progress_bar=False):
return 0

def download(
self,
from_info,
to_info,
name=None,
no_progress_bar=False,
file_mode=None,
dir_mode=None,
self, from_info, to_info, name=None, file_mode=None, dir_mode=None
):
if not hasattr(self, "_download"):
raise RemoteActionNotImplemented("download", self.scheme)
Expand All @@ -568,15 +558,13 @@ def download(

if self.isdir(from_info):
return self._download_dir(
from_info, to_info, name, no_progress_bar, file_mode, dir_mode
from_info, to_info, name, file_mode, dir_mode
)
return self._download_file(
from_info, to_info, name, no_progress_bar, file_mode, dir_mode
from_info, to_info, name, file_mode, dir_mode
)

def _download_dir(
self, from_info, to_info, name, no_progress_bar, file_mode, dir_mode
):
def _download_dir(self, from_info, to_info, name, file_mode, dir_mode):
from_infos = list(self.walk_files(from_info))
to_infos = (
to_info / info.relative_to(from_info) for info in from_infos
Expand All @@ -586,7 +574,6 @@ def _download_dir(
download_files = partial(
self._download_file,
name=name,
no_progress_bar=True,
file_mode=file_mode,
dir_mode=dir_mode,
)
Expand All @@ -596,13 +583,10 @@ def _download_dir(
total=len(from_infos),
desc="Downloading directory",
unit="Files",
disable=no_progress_bar,
) as futures:
) as futures, flags(tqdm_disable=True):
return sum(futures)

def _download_file(
self, from_info, to_info, name, no_progress_bar, file_mode, dir_mode
):
def _download_file(self, from_info, to_info, name, file_mode, dir_mode):
makedirs(to_info.parent, exist_ok=True, mode=dir_mode)

logger.debug("Downloading '{}' to '{}'".format(from_info, to_info))
Expand All @@ -611,9 +595,7 @@ def _download_file(
tmp_file = tmp_fname(to_info)

try:
self._download(
from_info, tmp_file, name=name, no_progress_bar=no_progress_bar
)
self._download(from_info, tmp_file, name=name)
except Exception:
msg = "failed to download '{}' to '{}'"
logger.exception(msg.format(from_info, to_info))
Expand Down Expand Up @@ -813,9 +795,7 @@ def safe_remove(self, path_info, force=False):

self.remove(path_info)

def _checkout_file(
self, path_info, checksum, force, progress_callback=None
):
def _checkout_file(self, path_info, checksum, force):
"""The file is changed we need to checkout a new copy"""
cache_info = self.checksum_to_path_info(checksum)
if self.exists(path_info):
Expand All @@ -826,18 +806,15 @@ def _checkout_file(
self.link(cache_info, path_info)
self.state.save_link(path_info)
self.state.save(path_info, checksum)
if progress_callback:
progress_callback(str(path_info))
(flags.tqdm or noop).update_desc(str(path_info))

def makedirs(self, path_info):
"""Optional: Implement only if the remote needs to create
directories before copying/linking/moving data
"""
pass

def _checkout_dir(
self, path_info, checksum, force, progress_callback=None, relink=False
):
def _checkout_dir(self, path_info, checksum, force, relink=False):
# Create dir separately so that dir is created
# even if there are no files in it
if not self.exists(path_info):
Expand All @@ -858,8 +835,8 @@ def _checkout_dir(
self.safe_remove(entry_info, force=force)
self.link(entry_cache_info, entry_info)
self.state.save(entry_info, entry_checksum)
if progress_callback:
progress_callback(str(entry_info))

(flags.tqdm or noop).update_desc(str(entry_info))

self._remove_redundant_files(path_info, dir_info, force)

Expand All @@ -876,14 +853,7 @@ def _remove_redundant_files(self, path_info, dir_info, force):
for path in existing_files - needed_files:
self.safe_remove(path, force)

def checkout(
self,
path_info,
checksum_info,
force=False,
progress_callback=None,
relink=False,
):
def checkout(self, path_info, checksum_info, force=False, relink=False):
if path_info.scheme not in ["local", self.scheme]:
raise NotImplementedError

Expand All @@ -910,33 +880,20 @@ def checkout(
failed = path_info

if failed or skip:
if progress_callback:
progress_callback(
str(path_info), self.get_files_number(checksum)
)
pbar = flags.tqdm or noop
pbar.update_desc(str(path_info), self.get_files_number(checksum))
return failed

msg = "Checking out '{}' with cache '{}'."
logger.debug(msg.format(str(path_info), checksum))

self._checkout(path_info, checksum, force, progress_callback, relink)
self._checkout(path_info, checksum, force, relink)
return None

def _checkout(
self,
path_info,
checksum,
force=False,
progress_callback=None,
relink=False,
):
def _checkout(self, path_info, checksum, force=False, relink=False):
if not self.is_dir_checksum(checksum):
return self._checkout_file(
path_info, checksum, force, progress_callback=progress_callback
)
return self._checkout_dir(
path_info, checksum, force, progress_callback, relink
)
return self._checkout_file(path_info, checksum, force)
return self._checkout_dir(path_info, checksum, force, relink)

def get_files_number(self, checksum):
if not checksum:
Expand Down
29 changes: 8 additions & 21 deletions dvc/remote/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,15 @@ def init_drive(self):
)

@gdrive_retry
def gdrive_upload_file(
self, args, no_progress_bar=True, from_file="", progress_name=""
):
def gdrive_upload_file(self, args, from_file, name):
item = self.drive.CreateFile(
{"title": args["title"], "parents": [{"id": args["parent_id"]}]}
)

with open(from_file, "rb") as fobj:
total = os.path.getsize(from_file)
with Tqdm.wrapattr(
fobj,
"read",
desc=progress_name,
total=total,
disable=no_progress_bar,
fobj, "read", desc=name, total=total
) as wrapped:
# PyDrive doesn't like content property setting for empty files
# https://github.com/gsuitedevs/PyDrive/issues/121
Expand All @@ -113,17 +107,13 @@ def gdrive_upload_file(
return item

@gdrive_retry
def gdrive_download_file(
self, file_id, to_file, progress_name, no_progress_bar
):
def gdrive_download_file(self, file_id, to_file, name):
gdrive_file = self.drive.CreateFile({"id": file_id})
bar_format = (
"Donwloading {desc:{ncols_desc}.{ncols_desc}}... "
+ Tqdm.format_sizeof(int(gdrive_file["fileSize"]), "B", 1024)
)
with Tqdm(
bar_format=bar_format, desc=progress_name, disable=no_progress_bar
):
with Tqdm(bar_format=bar_format, desc=name):
gdrive_file.GetContentFile(to_file)

def gdrive_list_item(self, query):
Expand Down Expand Up @@ -271,23 +261,20 @@ def get_remote_id(self, path_info, create=False):
def exists(self, path_info):
return self.get_remote_id(path_info) != ""

def _upload(self, from_file, to_info, name, no_progress_bar):
def _upload(self, from_file, to_info, name=None):
dirname = to_info.parent
if dirname:
parent_id = self.get_remote_id(dirname, True)
else:
parent_id = to_info.bucket

self.gdrive_upload_file(
{"title": to_info.name, "parent_id": parent_id},
no_progress_bar,
from_file,
name,
{"title": to_info.name, "parent_id": parent_id}, from_file, name
)

def _download(self, from_info, to_file, name, no_progress_bar):
def _download(self, from_info, to_file, name=None):
file_id = self.get_remote_id(from_info)
self.gdrive_download_file(file_id, to_file, name, no_progress_bar)
self.gdrive_download_file(file_id, to_file, name)

def all(self):
if not self.cached_ids:
Expand Down
Loading