Skip to content

Commit

Permalink
fs: prefer generic.copy over fsspec fs.get()
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Jul 24, 2023
1 parent a9d0ac2 commit 81d95ef
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 16 deletions.
2 changes: 1 addition & 1 deletion dvc/dependency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def update(self, rev=None):
self.fs_path = self.fs.path.version_path(self.fs_path, self.meta.version_id)

def download(self, to, jobs=None):
fs_download(self.fs, self.fs_path, to, jobs=jobs)
fs_download(self.fs, self.fs_path, to.fs_path, jobs=jobs)

def save(self):
super().save()
Expand Down
30 changes: 27 additions & 3 deletions dvc/fs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Optional
from urllib.parse import urlparse

from dvc_http import HTTPFileSystem, HTTPSFileSystem # noqa: F401
Expand All @@ -18,7 +19,7 @@
system,
utils,
)
from dvc_objects.fs.base import AnyFSPath, FileSystem # noqa: F401
from dvc_objects.fs.base import AnyFSPath, FileSystem # noqa: F401, TCH001
from dvc_objects.fs.errors import ( # noqa: F401
AuthError,
ConfigError,
Expand Down Expand Up @@ -48,12 +49,35 @@
# pylint: enable=unused-import


def download(fs, fs_path, to, jobs=None):
def download(fs: "FileSystem", fs_path: str, to: str, jobs: Optional[int] = None):
with Callback.as_tqdm_callback(
desc=f"Downloading {fs.path.name(fs_path)}",
unit="files",
) as cb:
fs.get(fs_path, to.fs_path, batch_size=jobs, callback=cb)
# NOTE: We use dvc-objects generic.copy over fs.get since it makes file
# download atomic and avoids fsspec glob/regex path expansion.
if fs.isdir(fs_path):
from_infos = list(fs.find(fs_path))
if not from_infos:
return localfs.makedirs(to, exist_ok=True)
to_infos = [
localfs.path.join(to, *fs.path.relparts(info, fs_path))
for info in from_infos
]
else:
from_infos = [fs_path]
to_infos = [to]

cb.set_size(len(from_infos))
jobs = jobs or fs.jobs
generic.copy(
fs,
from_infos,
localfs,
to_infos,
callback=cb,
batch_size=jobs,
)


def parse_external_url(url, config=None):
Expand Down
19 changes: 7 additions & 12 deletions dvc/repo/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def __init__(self):
def get(url, path, out=None, rev=None, jobs=None, force=False, config=None):
from dvc.config import Config
from dvc.dvcfile import is_valid_filename
from dvc.fs.callbacks import Callback
from dvc.repo import Repo

out = resolve_output(path, out, force=force)
Expand All @@ -40,6 +39,7 @@ def get(url, path, out=None, rev=None, jobs=None, force=False, config=None):
uninitialized=True,
config=config,
) as repo:
from dvc.fs import download
from dvc.fs.data import DataFileSystem

fs: Union[DataFileSystem, "DVCFileSystem"]
Expand All @@ -49,14 +49,9 @@ def get(url, path, out=None, rev=None, jobs=None, force=False, config=None):
else:
fs = repo.dvcfs
fs_path = fs.from_os_path(path)

with Callback.as_tqdm_callback(
desc=f"Downloading {fs.path.name(path)}",
unit="files",
) as cb:
fs.get(
fs_path,
os.path.abspath(out),
batch_size=jobs,
callback=cb,
)
download(
fs,
fs_path,
os.path.abspath(out),
jobs=jobs,
)

0 comments on commit 81d95ef

Please sign in to comment.