diff --git a/dvc/commands/repro.py b/dvc/commands/repro.py index fdb0659e2a..e432ffe8fb 100644 --- a/dvc/commands/repro.py +++ b/dvc/commands/repro.py @@ -38,6 +38,7 @@ def _common_kwargs(self): "recursive": self.args.recursive, "force_downstream": self.args.force_downstream, "pull": self.args.pull, + "allow_missing": self.args.allow_missing, } @property @@ -135,6 +136,12 @@ def add_arguments(repro_parser): "from the run-cache." ), ) + repro_parser.add_argument( + "--allow-missing", + action="store_true", + default=False, + help=("Skip stages with missing data but no other changes."), + ) repro_parser.add_argument( "--dry", action="store_true", diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index f83cc638a6..56517088c8 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -319,20 +319,22 @@ def env(self, checkpoint_func=None) -> Env: env.update(current) return env - def changed_deps(self) -> bool: + def changed_deps(self, allow_missing: bool = False) -> bool: if self.frozen: return False if self.is_callback or self.always_changed or self.is_checkpoint: return True - return self._changed_deps() + return self._changed_deps(allow_missing=allow_missing) @rwlocked(read=["deps"]) - def _changed_deps(self) -> bool: + def _changed_deps(self, allow_missing: bool = False) -> bool: for dep in self.deps: status = dep.status() if status: + if allow_missing and status[str(dep)] == "deleted": + continue logger.debug( "Dependency '%s' of %s changed because it is '%s'.", dep, @@ -343,10 +345,12 @@ def _changed_deps(self) -> bool: return False @rwlocked(read=["outs"]) - def changed_outs(self) -> bool: + def changed_outs(self, allow_missing: bool = False) -> bool: for out in self.outs: status = out.status() if status: + if allow_missing and status[str(out)] == "not in cache": + continue logger.debug( "Output '%s' of %s changed because it is '%s'.", out, @@ -364,13 +368,13 @@ def changed_stage(self) -> bool: return changed @rwlocked(read=["deps", "outs"]) - def changed(self) -> bool: + def changed(self, allow_missing: bool = False) -> bool: is_changed = ( # Short-circuit order: stage md5 is fast, # deps are expected to change self.changed_stage() - or self.changed_deps() - or self.changed_outs() + or self.changed_deps(allow_missing=allow_missing) + or self.changed_outs(allow_missing=allow_missing) ) if is_changed: logger.debug("%s changed.", self) @@ -421,7 +425,10 @@ def transfer( @rwlocked(read=["deps"], write=["outs"]) def reproduce(self, interactive=False, **kwargs) -> Optional["Stage"]: - if not (kwargs.get("force", False) or self.changed()): + if not ( + kwargs.get("force", False) + or self.changed(kwargs.get("allow_missing", False)) + ): if not isinstance(self, PipelineStage) and self.is_data_source: logger.info("'%s' didn't change, skipping", self.addressing) else: @@ -601,7 +608,7 @@ def run( # noqa: C901 ) or self.is_partial_import: self._sync_import(dry, force, kwargs.get("jobs", None), no_download) elif not self.frozen and self.cmd: - self._run_stage(dry, force, **kwargs) + self._run_stage(dry, force, allow_missing=allow_missing, **kwargs) elif kwargs.get("pull"): logger.info("Pulling data for %s", self) for objs in self.get_used_objs().values(): diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 3ae7d12b10..de51a55561 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -133,10 +133,25 @@ def cmd_run(stage, dry=False, checkpoint_func=None, run_env=None): _run(stage, executable, cmd, checkpoint_func=checkpoint_func, **kwargs) +def _pull_missing_deps(stage): + for dep in stage.deps: + if not dep.exists: + stage.repo.pull(dep.def_path) + + def run_stage( - stage, dry=False, force=False, checkpoint_func=None, run_env=None, **kwargs + stage, + dry=False, + force=False, + checkpoint_func=None, + run_env=None, + allow_missing: bool = False, + **kwargs, ): if not (force or checkpoint_func): + if allow_missing and kwargs.get("pull") and not dry: + _pull_missing_deps(stage) + from .cache import RunCacheNotFoundError try: diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index 9ac52a1fcd..7c85634194 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -427,7 +427,22 @@ def test_repro_pulls_mising_import(tmp_dir, dvc, mocker, erepo_dir, local_remote assert dvc.reproduce(pull=True) -def test_repro_pulls_intermediate_out(tmp_dir, dvc, mocker, local_remote): +def test_repro_allow_missing(tmp_dir, dvc): + tmp_dir.gen("fixed", "fixed") + dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"]) + dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) + (create_foo, copy_foo) = dvc.reproduce() + + remove("foo") + remove(create_foo.outs[0].cache_path) + remove(dvc.stage_cache.cache_dir) + + ret = dvc.reproduce(allow_missing=True) + # both stages are skipped + assert not ret + + +def test_repro_allow_missing_and_pull(tmp_dir, dvc, mocker, local_remote): tmp_dir.gen("fixed", "fixed") dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"]) dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) @@ -437,5 +452,8 @@ def test_repro_pulls_intermediate_out(tmp_dir, dvc, mocker, local_remote): remove("foo") remove(create_foo.outs[0].cache_path) + remove(dvc.stage_cache.cache_dir) - assert dvc.reproduce(pull=True) + ret = dvc.reproduce(pull=True, allow_missing=True) + # create-foo is skipped ; copy-foo pulls missing dep + assert len(ret) == 1 diff --git a/tests/unit/command/test_repro.py b/tests/unit/command/test_repro.py index bd30d0afef..3474ce0c9b 100644 --- a/tests/unit/command/test_repro.py +++ b/tests/unit/command/test_repro.py @@ -12,6 +12,7 @@ "recursive": False, "force_downstream": False, "pull": False, + "allow_missing": False, "targets": [], } repro_arguments = {