Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repro: Add --allow-missing #9437

Merged
merged 1 commit into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dvc/commands/repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def _common_kwargs(self):
"recursive": self.args.recursive,
"force_downstream": self.args.force_downstream,
"pull": self.args.pull,
"allow_missing": self.args.allow_missing,
}

@property
Expand Down Expand Up @@ -135,6 +136,12 @@ def add_arguments(repro_parser):
"from the run-cache."
),
)
repro_parser.add_argument(
"--allow-missing",
action="store_true",
default=False,
help=("Skip stages with missing data but no other changes."),
)
repro_parser.add_argument(
"--dry",
action="store_true",
Expand Down
25 changes: 16 additions & 9 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,22 @@ def env(self, checkpoint_func=None) -> Env:
env.update(current)
return env

def changed_deps(self) -> bool:
def changed_deps(self, allow_missing: bool = False) -> bool:
if self.frozen:
return False

if self.is_callback or self.always_changed or self.is_checkpoint:
return True

return self._changed_deps()
return self._changed_deps(allow_missing=allow_missing)

@rwlocked(read=["deps"])
def _changed_deps(self) -> bool:
def _changed_deps(self, allow_missing: bool = False) -> bool:
for dep in self.deps:
status = dep.status()
if status:
if allow_missing and status[str(dep)] == "deleted":
continue
logger.debug(
"Dependency '%s' of %s changed because it is '%s'.",
dep,
Expand All @@ -343,10 +345,12 @@ def _changed_deps(self) -> bool:
return False

@rwlocked(read=["outs"])
def changed_outs(self) -> bool:
def changed_outs(self, allow_missing: bool = False) -> bool:
for out in self.outs:
status = out.status()
if status:
if allow_missing and status[str(out)] == "not in cache":
continue
logger.debug(
"Output '%s' of %s changed because it is '%s'.",
out,
Expand All @@ -364,13 +368,13 @@ def changed_stage(self) -> bool:
return changed

@rwlocked(read=["deps", "outs"])
def changed(self) -> bool:
def changed(self, allow_missing: bool = False) -> bool:
is_changed = (
# Short-circuit order: stage md5 is fast,
# deps are expected to change
self.changed_stage()
or self.changed_deps()
or self.changed_outs()
or self.changed_deps(allow_missing=allow_missing)
or self.changed_outs(allow_missing=allow_missing)
)
if is_changed:
logger.debug("%s changed.", self)
Expand Down Expand Up @@ -421,7 +425,10 @@ def transfer(

@rwlocked(read=["deps"], write=["outs"])
def reproduce(self, interactive=False, **kwargs) -> Optional["Stage"]:
if not (kwargs.get("force", False) or self.changed()):
if not (
kwargs.get("force", False)
or self.changed(kwargs.get("allow_missing", False))
):
if not isinstance(self, PipelineStage) and self.is_data_source:
logger.info("'%s' didn't change, skipping", self.addressing)
else:
Expand Down Expand Up @@ -601,7 +608,7 @@ def run( # noqa: C901
) or self.is_partial_import:
self._sync_import(dry, force, kwargs.get("jobs", None), no_download)
elif not self.frozen and self.cmd:
self._run_stage(dry, force, **kwargs)
self._run_stage(dry, force, allow_missing=allow_missing, **kwargs)
elif kwargs.get("pull"):
logger.info("Pulling data for %s", self)
self.repo.pull(self.addressing, jobs=kwargs.get("jobs", None))
Expand Down
17 changes: 16 additions & 1 deletion dvc/stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,25 @@ def cmd_run(stage, dry=False, checkpoint_func=None, run_env=None):
_run(stage, executable, cmd, checkpoint_func=checkpoint_func, **kwargs)


def _pull_missing_deps(stage):
for dep in stage.deps:
if not dep.exists:
stage.repo.pull(dep.def_path)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skshetry I couldn't find a way to do this without using the high level API



def run_stage(
stage, dry=False, force=False, checkpoint_func=None, run_env=None, **kwargs
stage,
dry=False,
force=False,
checkpoint_func=None,
run_env=None,
allow_missing: bool = False,
**kwargs,
):
if not (force or checkpoint_func):
if allow_missing and kwargs.get("pull") and not dry:
_pull_missing_deps(stage)

from .cache import RunCacheNotFoundError

try:
Expand Down
22 changes: 20 additions & 2 deletions tests/func/test_repro_multistage.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,22 @@ def test_repro_pulls_mising_import(tmp_dir, dvc, mocker, erepo_dir, local_remote
assert dvc.reproduce(pull=True)


def test_repro_pulls_intermediate_out(tmp_dir, dvc, mocker, local_remote):
def test_repro_allow_missing(tmp_dir, dvc):
tmp_dir.gen("fixed", "fixed")
dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"])
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
(create_foo, copy_foo) = dvc.reproduce()

remove("foo")
remove(create_foo.outs[0].cache_path)
remove(dvc.stage_cache.cache_dir)

ret = dvc.reproduce(allow_missing=True)
# both stages are skipped
assert not ret


def test_repro_allow_missing_and_pull(tmp_dir, dvc, mocker, local_remote):
tmp_dir.gen("fixed", "fixed")
dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"])
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
Expand All @@ -437,5 +452,8 @@ def test_repro_pulls_intermediate_out(tmp_dir, dvc, mocker, local_remote):

remove("foo")
remove(create_foo.outs[0].cache_path)
remove(dvc.stage_cache.cache_dir)

assert dvc.reproduce(pull=True)
ret = dvc.reproduce(pull=True, allow_missing=True)
# create-foo is skipped ; copy-foo pulls missing dep
assert len(ret) == 1
1 change: 1 addition & 0 deletions tests/unit/command/test_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"recursive": False,
"force_downstream": False,
"pull": False,
"allow_missing": False,
"targets": [],
}
repro_arguments = {
Expand Down