From 634d350917d6e624c063e6a0bd2a3b99901ef892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Trifir=C3=B2?= Date: Fri, 15 Jul 2022 15:07:58 +0200 Subject: [PATCH] import-url/update: add --no-download flag add --no-download flag to dvc import-url/dvc update to only create/update .dvc files without downloading the associated data. Created .dvc files can be fetched using `dvc pull` or can be updated using `dvc update --no-download`. --- dvc/commands/imp.py | 13 +++++++-- dvc/commands/imp_url.py | 13 +++++++-- dvc/commands/update.py | 8 ++++++ dvc/output.py | 5 ++++ dvc/repo/__init__.py | 41 +++++++++++++++++++++++++- dvc/repo/fetch.py | 34 ++++++++++++++++++++++ dvc/repo/imp_url.py | 7 +++-- dvc/repo/index.py | 22 ++++++++++++++ dvc/repo/update.py | 14 ++++++++- dvc/stage/__init__.py | 46 +++++++++++++++++++++++++----- dvc/stage/exceptions.py | 5 ++++ dvc/stage/imports.py | 34 +++++++++++++++++++--- tests/func/test_data_cloud.py | 14 +++++++++ tests/func/test_import_url.py | 22 ++++++++++++++ tests/func/test_update.py | 27 ++++++++++++++++++ tests/unit/command/test_imp.py | 38 ++++++++++++++++++++++++ tests/unit/command/test_imp_url.py | 30 +++++++++++++++---- tests/unit/command/test_update.py | 2 ++ 18 files changed, 349 insertions(+), 26 deletions(-) diff --git a/dvc/commands/imp.py b/dvc/commands/imp.py index a31f69ecafc..8b7df178518 100644 --- a/dvc/commands/imp.py +++ b/dvc/commands/imp.py @@ -19,6 +19,7 @@ def run(self): fname=self.args.file, rev=self.args.rev, no_exec=self.args.no_exec, + no_download=self.args.no_download, desc=self.args.desc, jobs=self.args.jobs, ) @@ -69,11 +70,19 @@ def add_parser(subparsers, parent_parser): help="Specify name of the .dvc file this command will generate.", metavar="", ) - import_parser.add_argument( + no_download_exec_group = import_parser.add_mutually_exclusive_group() + no_download_exec_group.add_argument( "--no-exec", action="store_true", default=False, - help="Only create .dvc file without actually downloading it.", + help="Only create .dvc file without actually importing target data.", + ) + no_download_exec_group.add_argument( + "--no-download", + action="store_true", + default=False, + help="Create .dvc file including target data hash value(s)" + " but do not actually download the file(s).", ) import_parser.add_argument( "--desc", diff --git a/dvc/commands/imp_url.py b/dvc/commands/imp_url.py index 0a8d0a15d6a..add80884c0f 100644 --- a/dvc/commands/imp_url.py +++ b/dvc/commands/imp_url.py @@ -17,6 +17,7 @@ def run(self): out=self.args.out, fname=self.args.file, no_exec=self.args.no_exec, + no_download=self.args.no_download, remote=self.args.remote, to_remote=self.args.to_remote, desc=self.args.desc, @@ -66,11 +67,19 @@ def add_parser(subparsers, parent_parser): help="Specify name of the .dvc file this command will generate.", metavar="", ).complete = completion.DIR - import_parser.add_argument( + no_download_exec_group = import_parser.add_mutually_exclusive_group() + no_download_exec_group.add_argument( "--no-exec", action="store_true", default=False, - help="Only create .dvc file without actually downloading it.", + help="Only create .dvc file without actually importing target data.", + ) + no_download_exec_group.add_argument( + "--no-download", + action="store_true", + default=False, + help="Create .dvc file including target data hash value(s)" + " but do not actually download the file(s).", ) import_parser.add_argument( "--to-remote", diff --git a/dvc/commands/update.py b/dvc/commands/update.py index 5eee77b5aa2..7461cac9afc 100644 --- a/dvc/commands/update.py +++ b/dvc/commands/update.py @@ -18,6 +18,7 @@ def run(self): rev=self.args.rev, recursive=self.args.recursive, to_remote=self.args.to_remote, + no_download=self.args.no_download, remote=self.args.remote, jobs=self.args.jobs, ) @@ -55,6 +56,13 @@ def add_parser(subparsers, parent_parser): default=False, help="Update all stages in the specified directory.", ) + update_parser.add_argument( + "--no-download", + action="store_true", + default=False, + help="Update .dvc file hash value(s)" + " but do not download the file(s).", + ) update_parser.add_argument( "--to-remote", action="store_true", diff --git a/dvc/output.py b/dvc/output.py index 3ea5d754e2a..f4630b572ef 100644 --- a/dvc/output.py +++ b/dvc/output.py @@ -375,6 +375,11 @@ def __str__(self): return self.fs.path.relpath(self.fs_path, self.repo.root_dir) + def clear(self): + self.hash_info = HashInfo.from_dict({}) + self.meta = Meta.from_dict({}) + self.obj = None + @property def protocol(self): return self.fs.protocol diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index eb63e41bd6f..55151f09259 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -3,7 +3,7 @@ from collections import defaultdict from contextlib import contextmanager from functools import wraps -from typing import TYPE_CHECKING, Callable, Optional +from typing import TYPE_CHECKING, Callable, List, Optional from funcy import cached_property @@ -18,6 +18,7 @@ from dvc.fs import FileSystem from dvc.repo.scm_context import SCMContext from dvc.scm import Base + from dvc.stage import Stage logger = logging.getLogger(__name__) @@ -444,6 +445,44 @@ def used_objs( return used + def partial_imports( + self, + targets=None, + all_branches=False, + all_tags=False, + all_commits=False, + all_experiments=False, + commit_date: Optional[str] = None, + recursive=False, + revs=None, + num=1, + ) -> List["Stage"]: + """Get the stages related to the given target and collect dependencies + which are missing outputs. + + This is useful to retrieve files which have been imported to the repo + using --no-download. + + Returns: + A list of partially imported stages + """ + from itertools import chain + + partial_imports = chain.from_iterable( + self.index.partial_imports(targets, recursive=recursive) + for _ in self.brancher( + revs=revs, + all_branches=all_branches, + all_tags=all_tags, + all_commits=all_commits, + all_experiments=all_experiments, + commit_date=commit_date, + num=num, + ) + ) + + return list(partial_imports) + @property def stages(self): # obsolete, only for backward-compatibility return self.index.stages diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index eaa3a76cb4a..213e1efdb4a 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -93,6 +93,18 @@ def fetch( downloaded += d failed += f + d, f = _fetch_partial_imports( + self, + targets, + all_branches=all_branches, + all_tags=all_tags, + all_commits=all_commits, + recursive=recursive, + revs=revs, + ) + downloaded += d + failed += f + if failed: raise DownloadError(failed) @@ -107,3 +119,25 @@ def _fetch(repo, obj_ids, **kwargs): except FileTransferError as exc: failed += exc.amount return downloaded, failed + + +def _fetch_partial_imports(repo, targets, **kwargs): + from dvc.stage.exceptions import DataSourceChanged + + downloaded = 0 + failed = 0 + for stage in repo.partial_imports(targets, **kwargs): + try: + stage.run() + except DataSourceChanged as exc: + logger.warning(f"{exc}") + failed += 1 + continue + if not any( + kwargs.get(kw, None) + for kw in ("all_branches", "all_tags", "all_commits", "revs") + ): + stage.dump() + + downloaded += 1 + return downloaded, failed diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index 35a684f6659..fc2f709a079 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -17,6 +17,7 @@ def imp_url( fname=None, erepo=None, frozen=True, + no_download=False, no_exec=False, remote=None, to_remote=False, @@ -31,9 +32,9 @@ def imp_url( self, out, always_local=to_remote and not out ) - if to_remote and no_exec: + if to_remote and (no_exec or no_download): raise InvalidArgumentError( - "--no-exec can't be combined with --to-remote" + "--no-exec/--no-download cannot be combined with --to-remote" ) if not to_remote and remote: @@ -80,7 +81,7 @@ def imp_url( stage.save_deps() stage.md5 = stage.compute_md5() else: - stage.run(jobs=jobs) + stage.run(jobs=jobs, no_download=no_download) stage.frozen = frozen diff --git a/dvc/repo/index.py b/dvc/repo/index.py index 9e3fa1a0504..acaa2e94482 100644 --- a/dvc/repo/index.py +++ b/dvc/repo/index.py @@ -246,6 +246,28 @@ def used_objs( used[odb].update(objs) return used + def partial_imports( + self, + targets: "TargetType" = None, + recursive: bool = False, + ) -> List["Stage"]: + from itertools import chain + + from dvc.utils.collections import ensure_list + + collect_targets: Sequence[Optional[str]] = (None,) + if targets: + collect_targets = ensure_list(targets) + + pairs = chain.from_iterable( + self.stage_collector.collect_granular( + target, recursive=recursive, with_deps=True + ) + for target in collect_targets + ) + + return [stage for stage, _ in pairs if stage.is_partial_import] + # Following methods help us treat the collection as a set-like structure # and provides faux-immutability. # These methods do not preserve stages order. diff --git a/dvc/repo/update.py b/dvc/repo/update.py index d4ec8a9ffde..9c620a0cafd 100644 --- a/dvc/repo/update.py +++ b/dvc/repo/update.py @@ -10,6 +10,7 @@ def update( rev=None, recursive=False, to_remote=False, + no_download=False, remote=None, jobs=None, ): @@ -21,6 +22,11 @@ def update( if isinstance(targets, str): targets = [targets] + if to_remote and no_download: + raise InvalidArgumentError( + "--to-remote can't be used with --no-download" + ) + if not to_remote and remote: raise InvalidArgumentError( "--remote can't be used without --to-remote" @@ -31,7 +37,13 @@ def update( stages.update(self.stage.collect(target, recursive=recursive)) for stage in stages: - stage.update(rev, to_remote=to_remote, remote=remote, jobs=jobs) + stage.update( + rev, + to_remote=to_remote, + remote=remote, + no_download=no_download, + jobs=jobs, + ) dvcfile = Dvcfile(self, stage.path) dvcfile.dump(stage) diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 5f4f02f7094..02e0cb103bf 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -78,6 +78,7 @@ def create_stage(cls, repo, path, external=False, **kwargs): wdir = os.path.abspath(kwargs.get("wdir", None) or os.curdir) path = os.path.abspath(path) + check_dvcfile_path(repo, path) check_stage_path(repo, wdir, is_wdir=kwargs.get("wdir")) check_stage_path(repo, os.path.dirname(path)) @@ -242,6 +243,14 @@ def is_import(self): """Whether the DVC file was created with `dvc import`.""" return not self.cmd and len(self.deps) == 1 and len(self.outs) == 1 + @property + def is_partial_import(self) -> bool: + """ + Whether the DVC file was created using `dvc import --no-download` + or `dvc import-url --no-download`. + """ + return self.is_import and (not self.outs[0].hash_info) + @property def is_repo_import(self): if not self.is_import: @@ -433,11 +442,23 @@ def reproduce(self, interactive=False, **kwargs): return self - def update(self, rev=None, to_remote=False, remote=None, jobs=None): + def update( + self, + rev=None, + to_remote=False, + remote=None, + no_download=None, + jobs=None, + ): if not (self.is_repo_import or self.is_import): raise StageUpdateError(self.relpath) update_import( - self, rev=rev, to_remote=to_remote, remote=remote, jobs=jobs + self, + rev=rev, + to_remote=to_remote, + remote=remote, + no_download=no_download, + jobs=jobs, ) def reload(self): @@ -457,6 +478,7 @@ def compute_md5(self): def save(self, allow_missing=False): self.save_deps(allow_missing=allow_missing) + self.save_outs(allow_missing=allow_missing) self.md5 = self.compute_md5() @@ -525,14 +547,17 @@ def run( no_commit=False, force=False, allow_missing=False, + no_download=False, **kwargs, ): if (self.cmd or self.is_import) and not self.frozen and not dry: self.remove_outs(ignore_remove=False, force=False) - if not self.frozen and self.is_import: + if (not self.frozen and self.is_import) or self.is_partial_import: jobs = kwargs.get("jobs", None) - self._sync_import(dry, force, jobs) + self._sync_import( + dry, force, jobs, no_download, check_changed=self.frozen + ) elif not self.frozen and self.cmd: self._run_stage(dry, force, **kwargs) else: @@ -544,7 +569,7 @@ def run( self._check_missing_outputs() if not dry: - if kwargs.get("checkpoint_func", None): + if kwargs.get("checkpoint_func", None) or no_download: allow_missing = True self.save(allow_missing=allow_missing) if not no_commit: @@ -555,8 +580,10 @@ def _run_stage(self, dry, force, **kwargs): return run_stage(self, dry, force, **kwargs) @rwlocked(read=["deps"], write=["outs"]) - def _sync_import(self, dry, force, jobs): - sync_import(self, dry, force, jobs) + def _sync_import( + self, dry, force, jobs, no_download, check_changed: bool = False + ): + sync_import(self, dry, force, jobs, no_download, check_changed) @rwlocked(read=["outs"]) def _check_missing_outputs(self): @@ -571,6 +598,8 @@ def _func(o): @rwlocked(write=["outs"]) def checkout(self, allow_missing=False, **kwargs): stats = defaultdict(list) + if self.is_partial_import: + return {} for out in self.filter_outs(kwargs.get("filter_info")): key, outs = self._checkout( out, @@ -658,6 +687,9 @@ def get_used_objs( self, *args, **kwargs ) -> Dict[Optional["ObjectDB"], Set["HashInfo"]]: """Return set of object IDs used by this stage.""" + if self.is_partial_import: + return {} + used_objs = defaultdict(set) for out in self.filter_outs(kwargs.get("filter_info")): for odb, objs in out.get_used_objs(*args, **kwargs).items(): diff --git a/dvc/stage/exceptions.py b/dvc/stage/exceptions.py index 89644cd29ac..1761109b770 100644 --- a/dvc/stage/exceptions.py +++ b/dvc/stage/exceptions.py @@ -80,6 +80,11 @@ def __init__(self, missing_files): super().__init__(msg) +class DataSourceChanged(DvcException): + def __init__(self, path: str): + super().__init__(f"data source changed: {path}") + + class StageNotFound(DvcException, KeyError): def __init__(self, file, name): self.file = file.relpath diff --git a/dvc/stage/imports.py b/dvc/stage/imports.py index 35f593df76b..23316ab42f1 100644 --- a/dvc/stage/imports.py +++ b/dvc/stage/imports.py @@ -17,20 +17,38 @@ def _update_import_on_remote(stage, remote, jobs): stage.outs[0].transfer(url, odb=odb, jobs=jobs, update=True) -def update_import(stage, rev=None, to_remote=False, remote=None, jobs=None): +def update_import( + stage, rev=None, to_remote=False, remote=None, no_download=None, jobs=None +): stage.deps[0].update(rev=rev) + outs = stage.outs + deps = stage.deps + frozen = stage.frozen + outs = stage.outs stage.frozen = False + + if stage.outs: + stage.outs[0].clear() try: if to_remote: _update_import_on_remote(stage, remote, jobs) else: - stage.reproduce(jobs=jobs) + stage.reproduce(no_download=no_download, jobs=jobs) finally: + if deps == stage.deps: + stage.outs = outs stage.frozen = frozen -def sync_import(stage, dry=False, force=False, jobs=None): +def sync_import( + stage, + dry=False, + force=False, + jobs=None, + no_download=False, + check_changed=False, +): """Synchronize import's outs to the workspace.""" logger.info("Importing '%s' -> '%s'", stage.deps[0], stage.outs[0]) if dry: @@ -39,5 +57,13 @@ def sync_import(stage, dry=False, force=False, jobs=None): if not force and stage.already_cached(): stage.outs[0].checkout() else: + if check_changed: + old_hash_info = stage.deps[0].hash_info stage.save_deps() - stage.deps[0].download(stage.outs[0], jobs=jobs) + if check_changed and not old_hash_info == stage.deps[0].hash_info: + from dvc.stage.exceptions import DataSourceChanged + + raise DataSourceChanged(f"{stage} ({stage.deps[0]})") + + if not no_download: + stage.deps[0].download(stage.outs[0], jobs=jobs) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 192e350f734..bc73948325a 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -273,6 +273,20 @@ def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): assert (tmp_dir / "new_dir" / "bar").read_text() == "bar" +def test_pull_partial_import(tmp_dir, dvc, local_workspace): + local_workspace.gen("file", "file content") + dst = tmp_dir / "file" + stage = dvc.imp_url( + "remote://workspace/file", os.fspath(dst), no_download=True + ) + + result = dvc.pull("file") + assert result["fetched"] == 1 + assert dst.exists() + + assert stage.outs[0].get_hash().value == "d10b4c3ff123b26dc068d43a8bef2d23" + + def test_pull_external_dvc_imports_mixed( tmp_dir, dvc, scm, erepo_dir, local_remote ): diff --git a/tests/func/test_import_url.py b/tests/func/test_import_url.py index 33e79e4081d..8c6cd09c719 100644 --- a/tests/func/test_import_url.py +++ b/tests/func/test_import_url.py @@ -267,3 +267,25 @@ def test_import_url_to_remote_status(tmp_dir, dvc, local_cloud, local_remote): status = dvc.status() assert len(status) == 0 + + +def test_import_url_no_download(tmp_dir, dvc, local_workspace): + local_workspace.gen("file", "file content") + dst = tmp_dir / "file" + stage = dvc.imp_url( + "remote://workspace/file", os.fspath(dst), no_download=True + ) + + assert stage.deps[0].hash_info.value == "d10b4c3ff123b26dc068d43a8bef2d23" + + assert not dst.exists() + + out = stage.outs[0] + assert out.hash_info.value is None + assert out.hash_info.name is None + assert out.meta.size is None + + status = dvc.status() + assert status["file.dvc"] == [ + {"changed outs": {"file": "deleted"}}, + ] diff --git a/tests/func/test_update.py b/tests/func/test_update.py index 37f98f314a5..eb4e3cf82ea 100644 --- a/tests/func/test_update.py +++ b/tests/func/test_update.py @@ -176,6 +176,33 @@ def test_update_import_url(tmp_dir, dvc, workspace): assert dst.read_text() == "updated file content" +def test_update_import_url_no_download(tmp_dir, dvc, workspace): + workspace.gen("file", "file content") + + dst = tmp_dir / "imported_file" + stage = dvc.imp_url( + "remote://workspace/file", os.fspath(dst), no_download=True + ) + hash_info = stage.deps[0].hash_info + + assert not dst.is_file() + assert stage.deps[0].hash_info.value == "d10b4c3ff123b26dc068d43a8bef2d23" + + workspace.gen("file", "updated file content") + + updated_stage = dvc.update([stage.path], no_download=True)[0] + assert not dst.exists() + + updated_hash_info = updated_stage.deps[0].hash_info + assert updated_hash_info != hash_info + assert updated_hash_info.value == "6ffba511ce3aa40b8231d1b1f8c5fba5" + + out = updated_stage.outs[0] + assert out.hash_info.value is None + assert out.hash_info.name is None + assert out.meta.size is None + + def test_update_rev(tmp_dir, dvc, scm, git_dir): with git_dir.chdir(): git_dir.scm_gen({"foo": "foo"}, commit="first") diff --git a/tests/unit/command/test_imp.py b/tests/unit/command/test_imp.py index f00e3f0db32..c8965e3a339 100644 --- a/tests/unit/command/test_imp.py +++ b/tests/unit/command/test_imp.py @@ -34,6 +34,7 @@ def test_import(mocker): fname="file", rev="version", no_exec=False, + no_download=False, desc="description", jobs=3, ) @@ -69,6 +70,43 @@ def test_import_no_exec(mocker): fname="file", rev="version", no_exec=True, + no_download=False, + desc="description", + jobs=None, + ) + + +def test_import_no_download(mocker): + cli_args = parse_args( + [ + "import", + "repo_url", + "src", + "--out", + "out", + "--file", + "file", + "--rev", + "version", + "--no-download", + "--desc", + "description", + ] + ) + + cmd = cli_args.func(cli_args) + m = mocker.patch.object(cmd.repo, "imp", autospec=True) + + assert cmd.run() == 0 + + m.assert_called_once_with( + "repo_url", + path="src", + out="out", + fname="file", + rev="version", + no_exec=False, + no_download=True, desc="description", jobs=None, ) diff --git a/tests/unit/command/test_imp_url.py b/tests/unit/command/test_imp_url.py index 3204809c3ad..3f07f6a2417 100644 --- a/tests/unit/command/test_imp_url.py +++ b/tests/unit/command/test_imp_url.py @@ -1,5 +1,7 @@ import logging +import pytest + from dvc.cli import parse_args from dvc.commands.imp_url import CmdImportUrl from dvc.exceptions import DvcException @@ -31,6 +33,7 @@ def test_import_url(mocker): out="out", fname="file", no_exec=False, + no_download=False, remote=None, to_remote=False, desc="description", @@ -54,11 +57,19 @@ def test_failed_import_url(mocker, caplog): assert expected_error in caplog.text -def test_import_url_no_exec(mocker): +@pytest.mark.parametrize( + "flag,expected", + [ + ("--no-exec", {"no_exec": True, "no_download": False}), + ("--no-download", {"no_download": True, "no_exec": False}), + ], +) +def test_import_url_no_exec_download_flags(mocker, flag, expected): + cli_args = parse_args( [ "import-url", - "--no-exec", + flag, "src", "out", "--file", @@ -77,11 +88,11 @@ def test_import_url_no_exec(mocker): "src", out="out", fname="file", - no_exec=True, remote=None, to_remote=False, desc="description", jobs=None, + **expected ) @@ -110,6 +121,7 @@ def test_import_url_to_remote(mocker): out="bar", fname=None, no_exec=False, + no_download=False, remote="remote", to_remote=True, desc="description", @@ -117,7 +129,8 @@ def test_import_url_to_remote(mocker): ) -def test_import_url_to_remote_invalid_combination(dvc, mocker, caplog): +@pytest.mark.parametrize("flag", ["--no-exec", "--no-download"]) +def test_import_url_to_remote_invalid_combination(dvc, mocker, caplog, flag): cli_args = parse_args( [ "import-url", @@ -126,7 +139,7 @@ def test_import_url_to_remote_invalid_combination(dvc, mocker, caplog): "--to-remote", "--remote", "remote", - "--no-exec", + flag, ] ) assert cli_args.func == CmdImportUrl @@ -134,9 +147,14 @@ def test_import_url_to_remote_invalid_combination(dvc, mocker, caplog): cmd = cli_args.func(cli_args) with caplog.at_level(logging.ERROR, logger="dvc"): assert cmd.run() == 1 - expected_msg = "--no-exec can't be combined with --to-remote" + expected_msg = ( + "--no-exec/--no-download cannot be combined with --to-remote" + ) assert expected_msg in caplog.text + +def test_import_url_to_remote_flag(dvc, mocker, caplog): + cli_args = parse_args( ["import-url", "s3://bucket/foo", "bar", "--remote", "remote"] ) diff --git a/tests/unit/command/test_update.py b/tests/unit/command/test_update.py index 3aa162181c5..d6319552b1c 100644 --- a/tests/unit/command/test_update.py +++ b/tests/unit/command/test_update.py @@ -26,6 +26,7 @@ def test_update(dvc, mocker): rev="REV", recursive=True, to_remote=False, + no_download=False, remote=None, jobs=8, ) @@ -56,6 +57,7 @@ def test_update_to_remote(dvc, mocker): rev=None, recursive=True, to_remote=True, + no_download=False, remote="remote", jobs=5, )