Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs.download: prefer generic.copy over fsspec fs.get() #9753

Merged
merged 1 commit into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dvc/dependency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def update(self, rev=None):
self.fs_path = self.fs.path.version_path(self.fs_path, self.meta.version_id)

def download(self, to, jobs=None):
fs_download(self.fs, self.fs_path, to, jobs=jobs)
fs_download(self.fs, self.fs_path, to.fs_path, jobs=jobs)

def save(self):
super().save()
Expand Down
34 changes: 31 additions & 3 deletions dvc/fs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Optional
from urllib.parse import urlparse

from dvc_http import HTTPFileSystem, HTTPSFileSystem # noqa: F401
Expand All @@ -18,7 +19,7 @@
system,
utils,
)
from dvc_objects.fs.base import AnyFSPath, FileSystem # noqa: F401
from dvc_objects.fs.base import AnyFSPath, FileSystem # noqa: F401, TCH001
from dvc_objects.fs.errors import ( # noqa: F401
AuthError,
ConfigError,
Expand Down Expand Up @@ -48,12 +49,39 @@
# pylint: enable=unused-import


def download(fs, fs_path, to, jobs=None):
def download(fs: "FileSystem", fs_path: str, to: str, jobs: Optional[int] = None):
with Callback.as_tqdm_callback(
desc=f"Downloading {fs.path.name(fs_path)}",
unit="files",
) as cb:
fs.get(fs_path, to.fs_path, batch_size=jobs, callback=cb)
# NOTE: We use dvc-objects generic.copy over fs.get since it makes file
# download atomic and avoids fsspec glob/regex path expansion.
if fs.isdir(fs_path):
from_infos = [
path
for path in fs.find(fs_path)
if not path.endswith(fs.path.flavour.sep)
]
if not from_infos:
return localfs.makedirs(to, exist_ok=True)
to_infos = [
localfs.path.join(to, *fs.path.relparts(info, fs_path))
for info in from_infos
]
else:
from_infos = [fs_path]
to_infos = [to]

cb.set_size(len(from_infos))
jobs = jobs or fs.jobs
generic.copy(
fs,
from_infos,
localfs,
to_infos,
callback=cb,
batch_size=jobs,
)


def parse_external_url(url, config=None):
Expand Down
19 changes: 7 additions & 12 deletions dvc/repo/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def __init__(self):
def get(url, path, out=None, rev=None, jobs=None, force=False, config=None):
from dvc.config import Config
from dvc.dvcfile import is_valid_filename
from dvc.fs.callbacks import Callback
from dvc.repo import Repo

out = resolve_output(path, out, force=force)
Expand All @@ -40,6 +39,7 @@ def get(url, path, out=None, rev=None, jobs=None, force=False, config=None):
uninitialized=True,
config=config,
) as repo:
from dvc.fs import download
from dvc.fs.data import DataFileSystem

fs: Union[DataFileSystem, "DVCFileSystem"]
Expand All @@ -49,14 +49,9 @@ def get(url, path, out=None, rev=None, jobs=None, force=False, config=None):
else:
fs = repo.dvcfs
fs_path = fs.from_os_path(path)

with Callback.as_tqdm_callback(
desc=f"Downloading {fs.path.name(path)}",
unit="files",
) as cb:
fs.get(
fs_path,
os.path.abspath(out),
batch_size=jobs,
callback=cb,
)
download(
fs,
fs_path,
os.path.abspath(out),
jobs=jobs,
)
2 changes: 1 addition & 1 deletion dvc/repo/get_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ def get_url(url, out=None, *, config=None, jobs=None, force=False):
src_fs, src_path = parse_external_url(url, config)
if not src_fs.exists(src_path):
raise URLMissingError(url)
download(src_fs, src_path, out, jobs=jobs)
download(src_fs, src_path, out.fs_path, jobs=jobs)
6 changes: 4 additions & 2 deletions tests/func/repro/test_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dvc.cli import main
from dvc.dvcfile import LOCK_FILE, PROJECT_FILE
from dvc.exceptions import CyclicGraphError, ReproductionError
from dvc.fs import LocalFileSystem, system
from dvc.fs import system
from dvc.output import Output
from dvc.stage import PipelineStage, Stage
from dvc.stage.cache import RunCacheNotSupported
Expand Down Expand Up @@ -624,12 +624,14 @@ def test_force_with_dependencies(
assert stage.outs[0].hash_info != saved_stage.outs[0].hash_info

def test_force_import(self, mocker, tmp_dir, dvc):
from dvc.dependency import base

tmp_dir.dvc_gen("foo", "foo")

ret = main(["import-url", "foo", "bar"])
assert ret == 0

spy_get = mocker.spy(LocalFileSystem, "get")
spy_get = mocker.spy(base, "fs_download")
spy_checkout = mocker.spy(Output, "checkout")

assert main(["unfreeze", "bar.dvc"]) == 0
Expand Down