Skip to content

Commit

Permalink
repro: Add --allow-missing.
Browse files Browse the repository at this point in the history
Point 2 in #9375 (comment)
  • Loading branch information
daavoo committed May 12, 2023
1 parent e8d0f24 commit a1a49cc
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 12 deletions.
7 changes: 7 additions & 0 deletions dvc/commands/repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def _common_kwargs(self):
"recursive": self.args.recursive,
"force_downstream": self.args.force_downstream,
"pull": self.args.pull,
"allow_missing": self.args.allow_missing,
}

@property
Expand Down Expand Up @@ -135,6 +136,12 @@ def add_arguments(repro_parser):
"from the run-cache."
),
)
repro_parser.add_argument(
"--allow-missing",
action="store_true",
default=False,
help=("Skip stages with missing data but no other changes."),
)
repro_parser.add_argument(
"--dry",
action="store_true",
Expand Down
25 changes: 16 additions & 9 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,22 @@ def env(self, checkpoint_func=None) -> Env:
env.update(current)
return env

def changed_deps(self) -> bool:
def changed_deps(self, allow_missing: bool = False) -> bool:
if self.frozen:
return False

if self.is_callback or self.always_changed or self.is_checkpoint:
return True

return self._changed_deps()
return self._changed_deps(allow_missing=allow_missing)

@rwlocked(read=["deps"])
def _changed_deps(self) -> bool:
def _changed_deps(self, allow_missing: bool = False) -> bool:
for dep in self.deps:
status = dep.status()
if status:
if allow_missing and status[str(dep)] == "deleted":
continue
logger.debug(
"Dependency '%s' of %s changed because it is '%s'.",
dep,
Expand All @@ -343,10 +345,12 @@ def _changed_deps(self) -> bool:
return False

@rwlocked(read=["outs"])
def changed_outs(self) -> bool:
def changed_outs(self, allow_missing: bool = False) -> bool:
for out in self.outs:
status = out.status()
if status:
if allow_missing and status[str(out)] == "not in cache":
continue
logger.debug(
"Output '%s' of %s changed because it is '%s'.",
out,
Expand All @@ -364,13 +368,13 @@ def changed_stage(self) -> bool:
return changed

@rwlocked(read=["deps", "outs"])
def changed(self) -> bool:
def changed(self, allow_missing: bool = False) -> bool:
is_changed = (
# Short-circuit order: stage md5 is fast,
# deps are expected to change
self.changed_stage()
or self.changed_deps()
or self.changed_outs()
or self.changed_deps(allow_missing=allow_missing)
or self.changed_outs(allow_missing=allow_missing)
)
if is_changed:
logger.debug("%s changed.", self)
Expand Down Expand Up @@ -421,7 +425,10 @@ def transfer(

@rwlocked(read=["deps"], write=["outs"])
def reproduce(self, interactive=False, **kwargs) -> Optional["Stage"]:
if not (kwargs.get("force", False) or self.changed()):
if not (
kwargs.get("force", False)
or self.changed(kwargs.get("allow_missing", False))
):
if not isinstance(self, PipelineStage) and self.is_data_source:
logger.info("'%s' didn't change, skipping", self.addressing)
else:
Expand Down Expand Up @@ -601,7 +608,7 @@ def run( # noqa: C901
) or self.is_partial_import:
self._sync_import(dry, force, kwargs.get("jobs", None), no_download)
elif not self.frozen and self.cmd:
self._run_stage(dry, force, **kwargs)
self._run_stage(dry, force, allow_missing=allow_missing, **kwargs)
elif kwargs.get("pull"):
logger.info("Pulling data for %s", self)
for objs in self.get_used_objs().values():
Expand Down
17 changes: 16 additions & 1 deletion dvc/stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,25 @@ def cmd_run(stage, dry=False, checkpoint_func=None, run_env=None):
_run(stage, executable, cmd, checkpoint_func=checkpoint_func, **kwargs)


def _pull_missing_deps(stage):
for dep in stage.deps:
if not dep.exists:
stage.repo.pull(dep.def_path)


def run_stage(
stage, dry=False, force=False, checkpoint_func=None, run_env=None, **kwargs
stage,
dry=False,
force=False,
checkpoint_func=None,
run_env=None,
allow_missing: bool = False,
**kwargs,
):
if not (force or checkpoint_func):
if allow_missing and kwargs.get("pull") and not dry:
_pull_missing_deps(stage)

from .cache import RunCacheNotFoundError

try:
Expand Down
22 changes: 20 additions & 2 deletions tests/func/test_repro_multistage.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,22 @@ def test_repro_pulls_mising_import(tmp_dir, dvc, mocker, erepo_dir, local_remote
assert dvc.reproduce(pull=True)


def test_repro_pulls_intermediate_out(tmp_dir, dvc, mocker, local_remote):
def test_repro_allow_missing(tmp_dir, dvc):
tmp_dir.gen("fixed", "fixed")
dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"])
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
(create_foo, copy_foo) = dvc.reproduce()

remove("foo")
remove(create_foo.outs[0].cache_path)
remove(dvc.stage_cache.cache_dir)

ret = dvc.reproduce(allow_missing=True)
# both stages are skipped
assert not ret


def test_repro_allow_missing_and_pull(tmp_dir, dvc, mocker, local_remote):
tmp_dir.gen("fixed", "fixed")
dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"])
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
Expand All @@ -437,5 +452,8 @@ def test_repro_pulls_intermediate_out(tmp_dir, dvc, mocker, local_remote):

remove("foo")
remove(create_foo.outs[0].cache_path)
remove(dvc.stage_cache.cache_dir)

assert dvc.reproduce(pull=True)
ret = dvc.reproduce(pull=True, allow_missing=True)
# create-foo is skipped ; copy-foo pulls missing dep
assert len(ret) == 1
1 change: 1 addition & 0 deletions tests/unit/command/test_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"recursive": False,
"force_downstream": False,
"pull": False,
"allow_missing": False,
"targets": [],
}
repro_arguments = {
Expand Down

0 comments on commit a1a49cc

Please sign in to comment.