diff --git a/dvc/api/__init__.py b/dvc/api/__init__.py index 693a1fb2bc..74cbe51e1f 100644 --- a/dvc/api/__init__.py +++ b/dvc/api/__init__.py @@ -2,13 +2,12 @@ from .data import open # pylint: disable=redefined-builtin from .data import get_url, read -from .experiments import exp_save, make_checkpoint +from .experiments import exp_save from .show import metrics_show, params_show __all__ = [ "exp_save", "get_url", - "make_checkpoint", "open", "params_show", "metrics_show", diff --git a/dvc/api/experiments.py b/dvc/api/experiments.py index cc6082d6e6..5d775ec1cd 100644 --- a/dvc/api/experiments.py +++ b/dvc/api/experiments.py @@ -1,37 +1,6 @@ -import builtins -import os -from time import sleep from typing import List, Optional -from dvc.env import DVC_CHECKPOINT, DVC_ROOT from dvc.repo import Repo -from dvc.stage.monitor import CheckpointTask - - -def make_checkpoint(): - """ - Signal DVC to create a checkpoint experiment. - - If the current process is being run from DVC, this function will block - until DVC has finished creating the checkpoint. Otherwise, this function - will return immediately. - """ - if os.getenv(DVC_CHECKPOINT) is None: - return - - root_dir = os.getenv(DVC_ROOT, Repo.find_root()) - signal_file = os.path.join( - root_dir, Repo.DVC_DIR, "tmp", CheckpointTask.SIGNAL_FILE - ) - - with builtins.open(signal_file, "w", encoding="utf-8") as fobj: - # NOTE: force flushing/writing empty file to disk, otherwise when - # run in certain contexts (pytest) file may not actually be written - fobj.write("") - fobj.flush() - os.fsync(fobj.fileno()) - while os.path.exists(signal_file): - sleep(0.1) def exp_save( diff --git a/dvc/commands/experiments/run.py b/dvc/commands/experiments/run.py index bbc5954872..53c35fa332 100644 --- a/dvc/commands/experiments/run.py +++ b/dvc/commands/experiments/run.py @@ -1,12 +1,9 @@ import argparse import logging -from dvc.cli import completion from dvc.cli.utils import append_doc_link from dvc.commands.repro import CmdRepro from dvc.commands.repro import add_arguments as add_repro_arguments -from dvc.exceptions import InvalidArgumentError -from dvc.ui import ui logger = logging.getLogger(__name__) @@ -15,25 +12,12 @@ class CmdExperimentsRun(CmdRepro): def run(self): from dvc.compare import show_metrics - if self.args.checkpoint_resume: - if self.args.reset: - raise InvalidArgumentError("--reset and --rev are mutually exclusive.") - if not (self.args.queue or self.args.tmp_dir): - raise InvalidArgumentError( - "--rev can only be used in conjunction with --queue or --temp." - ) - - if self.args.reset: - ui.write("Any existing checkpoints will be reset and re-run.") - results = self.repo.experiments.run( name=self.args.name, queue=self.args.queue, run_all=self.args.run_all, jobs=self.args.jobs, params=self.args.set_param, - checkpoint_resume=self.args.checkpoint_resume, - reset=self.args.reset, tmp_dir=self.args.tmp_dir, machine=self.args.machine, **self._common_kwargs, @@ -57,22 +41,6 @@ def add_parser(experiments_subparsers, parent_parser): formatter_class=argparse.RawDescriptionHelpFormatter, ) _add_run_common(experiments_run_parser) - experiments_run_parser.add_argument( - "-r", - "--rev", - type=str, - dest="checkpoint_resume", - help=( - "Continue the specified checkpoint experiment. Can only be used " - "in conjunction with --queue or --temp." - ), - metavar="", - ).complete = completion.EXPERIMENT - experiments_run_parser.add_argument( - "--reset", - action="store_true", - help="Reset existing checkpoints and restart the experiment.", - ) experiments_run_parser.set_defaults(func=CmdExperimentsRun) diff --git a/dvc/commands/experiments/show.py b/dvc/commands/experiments/show.py index 555f12e93a..c51bbf2d10 100644 --- a/dvc/commands/experiments/show.py +++ b/dvc/commands/experiments/show.py @@ -53,16 +53,13 @@ def _collect_names(all_experiments, **kwargs): experiment_types = { - "checkpoint_tip": "│ ╓", - "checkpoint_commit": "│ ╟", - "checkpoint_base": "├─╨", "branch_commit": "├──", "branch_base": "└──", "baseline": "", } -def _collect_rows( # noqa: C901, PLR0912, PLR0913, PLR0915 +def _collect_rows( # noqa: C901, PLR0913 base_rev, experiments, all_headers, @@ -86,7 +83,6 @@ def _collect_rows( # noqa: C901, PLR0912, PLR0913, PLR0915 reverse = sort_order == "desc" experiments = _sort_exp(experiments, sort_path, sort_name, sort_type, reverse) - new_checkpoint = True for i, (rev, results) in enumerate(experiments.items()): fill_value = FILL_VALUE_ERRORED if results.get("error") else fill_value row_dict = {k: fill_value for k in all_headers} @@ -104,30 +100,14 @@ def _collect_rows( # noqa: C901, PLR0912, PLR0913, PLR0915 else: name_rev = rev[:7] - tip = exp.get("checkpoint_tip") - parent_rev = exp.get("checkpoint_parent", "") - parent_exp = experiments.get(parent_rev, {}).get("data", {}) - parent_tip = parent_exp.get("checkpoint_tip") - parent = "" if is_baseline: typ = "baseline" - elif tip: - if tip == parent_tip: - typ = "checkpoint_tip" if new_checkpoint else "checkpoint_commit" - elif parent_rev == base_rev: - typ = "checkpoint_base" - else: - typ = "checkpoint_commit" - parent = parent_rev[:7] elif i < len(experiments) - 1: typ = "branch_commit" else: typ = "branch_base" - if not is_baseline: - new_checkpoint = not (tip and tip == parent_tip) - row_dict["Experiment"] = exp.get("name", "") row_dict["rev"] = name_rev row_dict["typ"] = typ @@ -191,12 +171,8 @@ def _sort_column(sort_by, metric_names, param_names): def _sort_exp(experiments, sort_path, sort_name, typ, reverse): def _sort(item): - rev, exp = item + _, exp = item exp_data = exp.get("data", {}) - tip = exp_data.get("checkpoint_tip") - if tip and tip != rev: - # Sort checkpoint experiments by tip commit - return _sort((tip, experiments[tip])) data = exp_data.get(typ, {}).get(sort_path, {}).get("data", {}) val = flatten(data).get(sort_name) return val is None, val diff --git a/dvc/commands/run.py b/dvc/commands/run.py index a5e74f11f3..a08b91e9e6 100644 --- a/dvc/commands/run.py +++ b/dvc/commands/run.py @@ -22,7 +22,6 @@ def run(self): self.args.plots_no_cache, self.args.outs_persist, self.args.outs_persist_no_cache, - self.args.checkpoints, self.args.params, self.args.command, ] @@ -39,7 +38,7 @@ def run(self): { "cmd": parse_cmd(self.args.command), "fname": kwargs.pop("file"), - "no_exec": self.args.no_exec or bool(self.args.checkpoints), + "no_exec": self.args.no_exec, "run_cache": not kwargs.pop("no_run_cache"), } ) diff --git a/dvc/commands/stage.py b/dvc/commands/stage.py index f3c11aec3c..c3953829aa 100644 --- a/dvc/commands/stage.py +++ b/dvc/commands/stage.py @@ -172,17 +172,6 @@ def _add_common_args(parser): help="Declare output file or directory (do not put into DVC cache).", metavar="", ).complete = completion.FILE - parser.add_argument( - "-c", - "--checkpoints", - action="append", - default=[], - help=( - "Declare checkpoint output file or directory for 'dvc exp run'. " - "Not compatible with 'dvc repro'." - ), - metavar="", - ).complete = completion.FILE parser.add_argument( "--external", action="store_true", diff --git a/dvc/env.py b/dvc/env.py index dac52bf4e1..61af64baf0 100644 --- a/dvc/env.py +++ b/dvc/env.py @@ -1,8 +1,6 @@ -DVC_CHECKPOINT = "DVC_CHECKPOINT" DVC_DAEMON = "DVC_DAEMON" DVC_PAGER = "DVC_PAGER" DVC_ROOT = "DVC_ROOT" -DVCLIVE_RESUME = "DVCLIVE_RESUME" DVC_IGNORE_ISATTY = "DVC_IGNORE_ISATTY" DVC_EXP_BASELINE_REV = "DVC_EXP_BASELINE_REV" DVC_EXP_NAME = "DVC_EXP_NAME" diff --git a/dvc/output.py b/dvc/output.py index 8267382e25..a93a45ecff 100644 --- a/dvc/output.py +++ b/dvc/output.py @@ -91,7 +91,6 @@ def loadd_from(stage, d_list): metric = d.pop(Output.PARAM_METRIC, False) plot = d.pop(Output.PARAM_PLOT, False) persist = d.pop(Output.PARAM_PERSIST, False) - checkpoint = d.pop(Output.PARAM_CHECKPOINT, False) remote = d.pop(Output.PARAM_REMOTE, None) annot = {field: d.pop(field, None) for field in ANNOTATION_FIELDS} files = d.pop(Output.PARAM_FILES, None) @@ -105,7 +104,6 @@ def loadd_from(stage, d_list): metric=metric, plot=plot, persist=persist, - checkpoint=checkpoint, remote=remote, **annot, files=files, @@ -122,7 +120,6 @@ def loads_from( metric=False, plot=False, persist=False, - checkpoint=False, remote=None, push=True, ): @@ -135,7 +132,6 @@ def loads_from( metric=metric, plot=plot, persist=persist, - checkpoint=checkpoint, remote=remote, push=push, ) @@ -191,7 +187,6 @@ def load_from_pipeline(stage, data, typ="outs"): [ Output.PARAM_CACHE, Output.PARAM_PERSIST, - Output.PARAM_CHECKPOINT, Output.PARAM_REMOTE, Output.PARAM_PUSH, *ANNOTATION_FIELDS, @@ -281,7 +276,6 @@ class Output: PARAM_PATH = "path" PARAM_CACHE = "cache" - PARAM_CHECKPOINT = "checkpoint" PARAM_FILES = "files" PARAM_METRIC = "metric" PARAM_METRIC_TYPE = "type" @@ -322,7 +316,6 @@ def __init__( # noqa: PLR0913 metric=False, plot=False, persist=False, - checkpoint=False, desc=None, type=None, # noqa: A002, pylint: disable=redefined-builtin labels=None, @@ -389,7 +382,6 @@ def __init__( # noqa: PLR0913 self.metric = False if self.IS_DEPENDENCY else metric self.plot = False if self.IS_DEPENDENCY else plot self.persist = persist - self.checkpoint = checkpoint self.can_push = push self.fs_path = self._parse_path(self.fs, fs_path) @@ -816,9 +808,6 @@ def dumpd(self, **kwargs): # noqa: C901, PLR0912 if self.persist: ret[self.PARAM_PERSIST] = self.persist - if self.checkpoint: - ret[self.PARAM_CHECKPOINT] = self.checkpoint - if self.remote: ret[self.PARAM_REMOTE] = self.remote @@ -888,7 +877,6 @@ def checkout( relink: bool = False, filter_info: Optional[str] = None, allow_missing: bool = False, - checkpoint_reset: bool = False, **kwargs, ) -> Optional[Tuple[bool, Optional[bool]]]: if not self.use_cache: @@ -901,11 +889,6 @@ def checkout( # backward compatibility return None - if self.checkpoint and checkpoint_reset: - if self.exists: - self.remove() - return None - added = not self.exists try: @@ -922,7 +905,7 @@ def checkout( **kwargs, ) except CheckoutError: - if allow_missing or self.checkpoint: + if allow_missing: return None raise self.set_exec() @@ -1270,7 +1253,6 @@ def _merge_dir_version_meta(self, other: "Output"): Required(Output.PARAM_PATH): str, Output.PARAM_PLOT: bool, Output.PARAM_PERSIST: bool, - Output.PARAM_CHECKPOINT: bool, Output.PARAM_CLOUD: CLOUD_SCHEMA, } diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 7057b71212..72cacf4a7d 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -5,7 +5,6 @@ from funcy import chain, first -from dvc.exceptions import DvcException from dvc.ui import ui from dvc.utils import relpath from dvc.utils.objects import cached_property @@ -22,7 +21,6 @@ CELERY_FAILED_STASH, CELERY_STASH, EXEC_APPLY, - EXEC_CHECKPOINT, EXEC_NAMESPACE, EXPS_NAMESPACE, WORKSPACE_STASH, @@ -48,10 +46,7 @@ class Experiments: repo (dvc.repo.Repo): repo instance that these experiments belong to. """ - BRANCH_RE = re.compile( - r"^(?P[a-f0-9]{7})-(?P[a-f0-9]+)" - r"(?P-checkpoint)?$" - ) + BRANCH_RE = re.compile(r"^(?P[a-f0-9]{7})-(?P[a-f0-9]+)") def __init__(self, repo): from dvc.scm import NoSCMError @@ -134,47 +129,18 @@ def reproduce_one( def queue_one( self, queue: "BaseStashQueue", - checkpoint_resume: Optional[str] = None, - reset: bool = False, **kwargs, ) -> "QueueEntry": """Queue a single experiment.""" - if reset: - self.reset_checkpoints() - if kwargs.pop("machine", None) is not None: # TODO: decide how to handle queued remote execution raise NotImplementedError - if checkpoint_resume: - from dvc.scm import resolve_rev - - resume_rev = resolve_rev(self.scm, checkpoint_resume) - try: - self.check_baseline(resume_rev) - checkpoint_resume = resume_rev - except BaselineMismatchError as exc: - raise DvcException( - f"Cannot resume from '{checkpoint_resume}' as it is not " - "derived from your current workspace." - ) from exc - else: - checkpoint_resume = self._workspace_resume_rev() - return self.new( queue, - checkpoint_resume=checkpoint_resume, - reset=reset, **kwargs, ) - def _workspace_resume_rev(self) -> Optional[str]: - last_checkpoint = self._get_last_checkpoint() - last_applied = self._get_last_applied() - if last_checkpoint and last_applied: - return last_applied - return None - def reproduce_celery( # noqa: C901 self, entries: Optional[Iterable["QueueEntry"]] = None, **kwargs ) -> Dict[str, str]: @@ -253,17 +219,12 @@ def new( self, queue: "BaseStashQueue", *args, - checkpoint_resume: Optional[str] = None, **kwargs, ) -> "QueueEntry": """Create and enqueue a new experiment. Experiment will be derived from the current workspace. """ - if checkpoint_resume is not None: - return self._resume_checkpoint( - queue, *args, resume_rev=checkpoint_resume, **kwargs - ) name = kwargs.get("name", None) baseline_sha = kwargs.get("baseline_rev") or self.repo.scm.get_rev() @@ -277,59 +238,6 @@ def new( return queue.put(*args, **kwargs) - def _resume_checkpoint( - self, - queue: "BaseStashQueue", - *args, - resume_rev: Optional[str] = None, - **kwargs, - ) -> "QueueEntry": - """Create and queue a resumed checkpoint experiment.""" - assert resume_rev - - branch: Optional[str] = None - try: - allow_multiple = bool(kwargs.get("params", None)) - branch = self.get_branch_by_rev(resume_rev, allow_multiple=allow_multiple) - if not branch: - raise DvcException( - f"Could not find checkpoint experiment '{resume_rev[:7]}'" - ) - baseline_rev = self._get_baseline(branch) - except MultipleBranchError as exc: - baselines = { - info.baseline_sha for info in exc.ref_infos if info.baseline_sha - } - if len(baselines) == 1: - baseline_rev = baselines.pop() - else: - raise - - logger.debug( - "Checkpoint run from '%s' with baseline '%s'", - resume_rev[:7], - baseline_rev, - ) - return queue.put( - *args, - resume_rev=resume_rev, - baseline_rev=baseline_rev, - branch=branch, - **kwargs, - ) - - def _get_last_checkpoint(self) -> Optional[str]: - try: - last_checkpoint = self.scm.get_ref(EXEC_CHECKPOINT) - if last_checkpoint: - self.check_baseline(last_checkpoint) - return last_checkpoint - except BaselineMismatchError: - # If HEAD has moved since the the last checkpoint run, - # the specified checkpoint is no longer relevant - self.scm.remove_ref(EXEC_CHECKPOINT) - return None - def _get_last_applied(self) -> Optional[str]: try: last_applied = self.scm.get_ref(EXEC_APPLY) @@ -342,10 +250,6 @@ def _get_last_applied(self) -> Optional[str]: self.scm.remove_ref(EXEC_APPLY) return None - def reset_checkpoints(self): - self.scm.remove_ref(EXEC_CHECKPOINT) - self.scm.remove_ref(EXEC_APPLY) - @unlocked_repo def _reproduce_queue(self, queue: "BaseStashQueue", **kwargs) -> Dict[str, str]: """Reproduce queued experiments. diff --git a/dvc/repo/experiments/exceptions.py b/dvc/repo/experiments/exceptions.py index d83d4a0005..09568ef731 100644 --- a/dvc/repo/experiments/exceptions.py +++ b/dvc/repo/experiments/exceptions.py @@ -31,21 +31,6 @@ def __init__(self, name: str, command: str = "run"): self.name = name -class CheckpointExistsError(DvcException): - def __init__(self, name: str): - msg = ( - "Reproduced checkpoint experiment conflicts with existing " - f"experiment '{name}'. To restart (and overwrite) the existing " - "experiment run:\n\n" - "\tdvc exp run -f ...\n\n" - "To resume the existing experiment, run:\n\n" - f"\tdvc exp apply {name}\n" - "\tdvc exp run\n" - ) - super().__init__(msg) - self.name = name - - class InvalidExpRefError(DvcException): def __init__(self, ref): super().__init__(f"'{ref}' is not a valid experiment refname.") diff --git a/dvc/repo/experiments/executor/base.py b/dvc/repo/experiments/executor/base.py index 74bf160d0a..677985fc1e 100644 --- a/dvc/repo/experiments/executor/base.py +++ b/dvc/repo/experiments/executor/base.py @@ -5,7 +5,6 @@ from contextlib import contextmanager from dataclasses import asdict, dataclass from enum import IntEnum -from functools import partial from typing import ( TYPE_CHECKING, Any, @@ -26,16 +25,10 @@ from dvc.env import DVC_EXP_AUTO_PUSH, DVC_EXP_GIT_REMOTE from dvc.exceptions import DvcException -from dvc.repo.experiments.exceptions import CheckpointExistsError, ExperimentExistsError -from dvc.repo.experiments.refs import ( - EXEC_BASELINE, - EXEC_BRANCH, - EXEC_CHECKPOINT, - ExpRefInfo, -) +from dvc.repo.experiments.exceptions import ExperimentExistsError +from dvc.repo.experiments.refs import EXEC_BASELINE, EXEC_BRANCH, ExpRefInfo from dvc.repo.experiments.utils import to_studio_params from dvc.stage.serialize import to_lockfile -from dvc.ui import ui from dvc.utils import dict_sha256, env2bool, relpath from dvc.utils.fs import remove @@ -355,7 +348,7 @@ def fetch_exps( dest_scm: "Git", refs: List[str], force: bool = False, - on_diverged: Optional[Callable[[str, bool], None]] = None, + on_diverged: Optional[Callable[[str], None]] = None, **kwargs, ) -> Iterable[str]: """Fetch reproduced experiment refs into the specified SCM. @@ -364,41 +357,32 @@ def fetch_exps( dest_scm: Destination Git instance. refs: reference names to be fetched from the remotes. force: If True, diverged refs will be overwritten - on_diverged: Callback in the form on_diverged(ref, is_checkpoint) + on_diverged: Callback in the form on_diverged(ref) to be called when an experiment ref has diverged. Extra kwargs will be passed into the remote git client. """ - if EXEC_CHECKPOINT in refs: - refs.remove(EXEC_CHECKPOINT) - has_checkpoint = True - else: - has_checkpoint = False - def on_diverged_ref(orig_ref: str, new_rev: str): if force: logger.debug("Replacing existing experiment '%s'", orig_ref) return True if on_diverged: - return on_diverged(orig_ref, has_checkpoint) + return on_diverged(orig_ref) - self._raise_ref_conflict(dest_scm, orig_ref, new_rev, has_checkpoint) + self._raise_ref_conflict(dest_scm, orig_ref, new_rev) logger.debug("Reproduced existing experiment '%s'", orig_ref) return False # fetch experiments try: refspecs = [f"{ref}:{ref}" for ref in refs] - # update last run checkpoint (if it exists) - if has_checkpoint: - refspecs.append(f"{EXEC_CHECKPOINT}:{EXEC_CHECKPOINT}") dest_scm.fetch_refspecs( self.git_url, refspecs, on_diverged=on_diverged_ref, - force=force or has_checkpoint, + force=force, **kwargs, ) except SCMError: @@ -491,19 +475,6 @@ def filter_pipeline(stages): repro_dry = kwargs.get("dry") - # NOTE: checkpoint outs are handled as a special type of persist - # out: - # - # - checkpoint out may not yet exist if this is the first time this - # experiment has been run, this is not an error condition for - # experiments - # - if experiment was run with --reset, the checkpoint out will be - # removed at the start of the experiment (regardless of any - # dvc.lock entry for the checkpoint out) - # - if run without --reset, the checkpoint out will be checked out - # using any hash present in dvc.lock (or removed if no entry - # exists in dvc.lock) - checkpoint_reset: bool = kwargs.pop("reset", False) if not repro_dry: dvc_checkout( dvc, @@ -512,22 +483,13 @@ def filter_pipeline(stages): force=True, quiet=True, allow_missing=True, - checkpoint_reset=checkpoint_reset, recursive=kwargs.get("recursive", False), ) - checkpoint_func = partial( - cls.checkpoint_callback, - dvc, - dvc.scm, - info.name, - repro_force or checkpoint_reset, - ) stages = dvc_reproduce( dvc, *args, on_unchanged=filter_pipeline, - checkpoint_func=checkpoint_func, **kwargs, ) @@ -536,7 +498,6 @@ def filter_pipeline(stages): ref, exp_ref, repro_force = cls._repro_commit( dvc, info, - stages, exp_hash, auto_push, git_remote, @@ -556,19 +517,16 @@ def _repro_commit( cls, dvc, info, - stages, exp_hash, auto_push, git_remote, repro_force, ) -> Tuple[Optional[str], Optional["ExpRefInfo"], bool]: - is_checkpoint = any(stage.is_checkpoint for stage in stages) cls.commit( dvc.scm, exp_hash, exp_name=info.name, force=repro_force, - checkpoint=is_checkpoint, ) if auto_push: cls._auto_push(dvc, dvc.scm, git_remote) @@ -600,7 +558,6 @@ def _repro_dvc( # noqa: C901 from dvc_studio_client.post_live_metrics import post_live_metrics from dvc.repo import Repo - from dvc.stage.monitor import CheckpointKilledError with Repo(os.path.join(info.root_dir, info.dvc_dir)) as dvc: info.status = TaskStatus.RUNNING @@ -625,9 +582,6 @@ def _repro_dvc( # noqa: C901 logger.debug("Running repro in '%s'", os.getcwd()) yield dvc info.status = TaskStatus.SUCCESS - except CheckpointKilledError: - info.status = TaskStatus.FAILED - raise except DvcException: if log_errors: logger.exception("") @@ -691,24 +645,6 @@ def _auto_push( exc, ) - @classmethod - def checkpoint_callback( - cls, - dvc: "Repo", - scm: "Git", - name: Optional[str], - force: bool, - unchanged: Iterable["PipelineStage"], - stages: Iterable["PipelineStage"], - ): - exp_hash = cls.hash_exp(list(stages) + list(unchanged)) - exp_rev = cls.commit(scm, exp_hash, exp_name=name, force=force, checkpoint=True) - - if env2bool(DVC_EXP_AUTO_PUSH): - git_remote = os.getenv(DVC_EXP_GIT_REMOTE) - cls._auto_push(dvc, scm, git_remote) - ui.write(f"Checkpoint experiment iteration '{exp_rev[:7]}'.") - @classmethod def commit( cls, @@ -716,7 +652,6 @@ def commit( exp_hash: str, exp_name: Optional[str] = None, force: bool = False, - checkpoint: bool = False, ): """Commit stages as an experiment and return the commit SHA.""" rev = scm.get_rev() @@ -749,23 +684,20 @@ def commit( scm.commit(f"dvc: commit experiment {exp_hash}", no_verify=True) new_rev = scm.get_rev() if check_conflict: - new_rev = cls._raise_ref_conflict(scm, branch, new_rev, checkpoint) + new_rev = cls._raise_ref_conflict(scm, branch, new_rev) else: scm.set_ref(branch, new_rev, old_ref=old_ref) scm.set_ref(EXEC_BRANCH, branch, symbolic=True) - if checkpoint: - scm.set_ref(EXEC_CHECKPOINT, new_rev) + return new_rev @staticmethod - def _raise_ref_conflict(scm, ref, new_rev, checkpoint): + def _raise_ref_conflict(scm, ref, new_rev): # If this commit is a duplicate of the existing commit at 'ref', return # the existing commit. Otherwise, error out and require user to re-run # with --force as needed orig_rev = scm.get_ref(ref) if scm.diff(orig_rev, new_rev): - if checkpoint: - raise CheckpointExistsError(ref) raise ExperimentExistsError(ref) return orig_rev diff --git a/dvc/repo/experiments/executor/local.py b/dvc/repo/experiments/executor/local.py index 07eb4ab82c..7f9774855a 100644 --- a/dvc/repo/experiments/executor/local.py +++ b/dvc/repo/experiments/executor/local.py @@ -9,10 +9,8 @@ from dvc.lock import LockError from dvc.repo.experiments.refs import ( - EXEC_APPLY, EXEC_BASELINE, EXEC_BRANCH, - EXEC_CHECKPOINT, EXEC_HEAD, EXEC_MERGE, EXEC_NAMESPACE, @@ -123,9 +121,6 @@ def init_git( if self.scm.get_ref(EXEC_BRANCH): self.scm.remove_ref(EXEC_BRANCH) - if self.scm.get_ref(EXEC_CHECKPOINT): - self.scm.remove_ref(EXEC_CHECKPOINT) - # checkout EXEC_HEAD and apply EXEC_MERGE on top of it without # committing assert isinstance(self.scm, Git) @@ -181,7 +176,6 @@ class WorkspaceExecutor(BaseLocalExecutor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._detach_stack = ExitStack() - self._orig_checkpoint = self.scm.get_ref(EXEC_CHECKPOINT) @classmethod def from_stash_entry( @@ -243,6 +237,3 @@ def cleanup(self, infofile: Optional[str] = None): self.scm.remove_ref(EXEC_MERGE) if self.scm.get_ref(EXEC_BRANCH): self.scm.remove_ref(EXEC_BRANCH) - checkpoint = self.scm.get_ref(EXEC_CHECKPOINT) - if checkpoint and checkpoint != self._orig_checkpoint: - self.scm.set_ref(EXEC_APPLY, checkpoint) diff --git a/dvc/repo/experiments/executor/ssh.py b/dvc/repo/experiments/executor/ssh.py index d71a7e606d..e1f5daff62 100644 --- a/dvc/repo/experiments/executor/ssh.py +++ b/dvc/repo/experiments/executor/ssh.py @@ -11,7 +11,6 @@ from dvc.repo.experiments.refs import ( EXEC_BASELINE, EXEC_BRANCH, - EXEC_CHECKPOINT, EXEC_HEAD, EXEC_MERGE, EXEC_NAMESPACE, @@ -162,7 +161,6 @@ def init_git( self._ssh_cmd(fs, f"git symbolic-ref {EXEC_BRANCH} {branch}") else: self._ssh_cmd(fs, f"git symbolic-ref -d {EXEC_BRANCH}", check=False) - self._ssh_cmd(fs, f"git update-ref -d {EXEC_CHECKPOINT}", check=False) # checkout EXEC_HEAD and apply EXEC_MERGE on top of it without # committing diff --git a/dvc/repo/experiments/queue/base.py b/dvc/repo/experiments/queue/base.py index 72913d59df..e519fa690c 100644 --- a/dvc/repo/experiments/queue/base.py +++ b/dvc/repo/experiments/queue/base.py @@ -22,10 +22,9 @@ from funcy import retry from dvc.dependency import ParamsDependency -from dvc.env import DVC_EXP_BASELINE_REV, DVC_EXP_NAME, DVCLIVE_RESUME -from dvc.exceptions import DvcException +from dvc.env import DVC_EXP_BASELINE_REV, DVC_EXP_NAME from dvc.lock import LockError -from dvc.repo.experiments.exceptions import CheckpointExistsError, ExperimentExistsError +from dvc.repo.experiments.exceptions import ExperimentExistsError from dvc.repo.experiments.executor.base import BaseExecutor from dvc.repo.experiments.executor.local import WorkspaceExecutor from dvc.repo.experiments.refs import ExpRefInfo @@ -33,11 +32,9 @@ from dvc.repo.experiments.utils import ( EXEC_PID_DIR, EXEC_TMP_DIR, - exp_refs_by_rev, get_exp_rwlock, get_random_exp_name, ) -from dvc.ui import ui from dvc.utils.objects import cached_property from .utils import get_remote_executor_refs @@ -297,11 +294,10 @@ def logs( output. """ - def _stash_exp( # noqa: PLR0915, C901 + def _stash_exp( # noqa: C901 self, *args, params: Optional[Dict[str, List[str]]] = None, - resume_rev: Optional[str] = None, baseline_rev: Optional[str] = None, branch: Optional[str] = None, name: Optional[str] = None, @@ -312,7 +308,6 @@ def _stash_exp( # noqa: PLR0915, C901 Args: params: Dict mapping paths to `Hydra Override`_ patterns, provided via `exp run --set-param`. - resume_rev: Optional checkpoint resume rev. baseline_rev: Optional baseline rev for this experiment, defaults to the current SCM rev. branch: Optional experiment branch name. If specified, the @@ -335,17 +330,6 @@ def _stash_exp( # noqa: PLR0915, C901 if workspace: self.stash.apply(workspace) - if resume_rev: - # move HEAD to the resume rev so that the stashed diff - # only contains changes relative to resume rev - stash_head = resume_rev - self.scm.set_ref( - "HEAD", - resume_rev, - message=f"dvc: resume from HEAD {resume_rev[:7]}", - ) - self.scm.reset() - # update experiment params from command line if params: self._update_params(params) @@ -354,60 +338,6 @@ def _stash_exp( # noqa: PLR0915, C901 # & tempdir runs self._stash_commit_deps(*args, **kwargs) - if resume_rev: - if branch: - branch_name = ExpRefInfo.from_ref(branch).name - else: - branch_name = f"{resume_rev[:7]}" - if self.scm.is_dirty(untracked_files=False): - ui.write( - ( - "Modified checkpoint experiment based on " - f"'{branch_name}' will be created" - ), - ) - branch = None - elif not branch or self.scm.get_ref(branch) != resume_rev: - err_msg = [ - "Nothing to do for unchanged checkpoint " - f"'{resume_rev[:7]}'. " - ] - if branch: - err_msg.append( - "To resume from the head of this " - "experiment, use " - f"'dvc exp apply {branch_name}'." - ) - else: - names = [ - ref_info.name - for ref_info in exp_refs_by_rev( - self.scm, resume_rev - ) - ] - if len(names) > 3: - names[3:] = [f"... ({len(names) - 3} more)"] - err_msg.append( - "To resume an experiment containing this " - "checkpoint, apply one of these heads:\n" - "\t{}".format(", ".join(names)) - ) - raise DvcException("".join(err_msg)) - else: - ui.write( - "Existing checkpoint experiment " - f"'{branch_name}' will be resumed" - ) - if name: - logger.warning( - ( - "Ignoring option '--name %s' for resumed " - "experiment. Existing experiment name will" - "be preserved instead." - ), - name, - ) - # save additional repro command line arguments run_env = { DVC_EXP_BASELINE_REV: baseline_rev, @@ -415,8 +345,6 @@ def _stash_exp( # noqa: PLR0915, C901 if not name: name = get_random_exp_name(self.scm, baseline_rev) run_env[DVC_EXP_NAME] = name - if resume_rev: - run_env[DVCLIVE_RESUME] = "1" studio_repo_url = get_studio_repo_url() if studio_repo_url is not None: run_env[STUDIO_REPO_URL] = studio_repo_url @@ -441,12 +369,6 @@ def _stash_exp( # noqa: PLR0915, C901 baseline_rev[:7], ) finally: - if resume_rev: - # NOTE: this set_ref + reset() is equivalent to - # `git reset orig_head` (our SCM reset() only operates - # on HEAD rather than any arbitrary commit) - self.scm.set_ref("HEAD", orig_head, message="dvc: restore HEAD") - self.scm.reset() # Revert any of our changes before prior unstashing self.scm.reset(hard=True) @@ -629,10 +551,8 @@ def collect_git( ) -> Dict[str, str]: results = {} - def on_diverged(ref: str, checkpoint: bool): + def on_diverged(ref: str): ref_info = ExpRefInfo.from_ref(ref) - if checkpoint: - raise CheckpointExistsError(ref_info.name) raise ExperimentExistsError(ref_info.name) refs = get_remote_executor_refs(exp.scm, executor.git_url) diff --git a/dvc/repo/experiments/queue/tempdir.py b/dvc/repo/experiments/queue/tempdir.py index 27bcaab61f..fde8fc5ed5 100644 --- a/dvc/repo/experiments/queue/tempdir.py +++ b/dvc/repo/experiments/queue/tempdir.py @@ -94,8 +94,6 @@ def iter_active(self) -> Generator[QueueEntry, None, None]: def _reproduce_entry( self, entry: QueueEntry, executor: "BaseExecutor" ) -> Dict[str, Dict[str, str]]: - from dvc.stage.monitor import CheckpointKilledError - results: Dict[str, Dict[str, str]] = defaultdict(dict) exec_name = self._EXEC_NAME or entry.stash_rev infofile = self.get_infofile_path(exec_name) @@ -114,12 +112,6 @@ def _reproduce_entry( results[rev].update( self.collect_executor(self.repo.experiments, executor, exec_result) ) - except CheckpointKilledError: - results[rev].update( - self.collect_executor(self.repo.experiments, executor, exec_result) - ) - - return results except DvcException: raise except Exception as exc: # noqa: BLE001 diff --git a/dvc/repo/experiments/queue/utils.py b/dvc/repo/experiments/queue/utils.py index e1c9b8b3dd..45afdfc44c 100644 --- a/dvc/repo/experiments/queue/utils.py +++ b/dvc/repo/experiments/queue/utils.py @@ -4,12 +4,7 @@ from scmrepo.exceptions import SCMError from dvc.repo.experiments.executor.base import ExecutorInfo, TaskStatus -from dvc.repo.experiments.refs import ( - EXEC_CHECKPOINT, - EXEC_NAMESPACE, - EXPS_NAMESPACE, - EXPS_STASH, -) +from dvc.repo.experiments.refs import EXEC_NAMESPACE, EXPS_NAMESPACE, EXPS_STASH from dvc.repo.experiments.utils import get_exp_rwlock, iter_remote_refs logger = logging.getLogger(__name__) @@ -33,9 +28,7 @@ def get_remote_executor_refs(scm: "Git", remote_url: str) -> List[str]: remote_url, base=EXPS_NAMESPACE, ): - if ref == EXEC_CHECKPOINT or ( - not ref.startswith(EXEC_NAMESPACE) and ref != EXPS_STASH - ): + if not ref.startswith(EXEC_NAMESPACE) and ref != EXPS_STASH: refs.append(ref) return refs @@ -67,7 +60,7 @@ def fetch_running_exp_from_temp_dir( result[rev] = info.asdict() if info.git_url and fetch_refs and info.status > TaskStatus.PREPARING: - def on_diverged(_ref: str, _checkpoint: bool): + def on_diverged(_ref: str): return True executor = TempDirExecutor.from_info(info) diff --git a/dvc/repo/experiments/queue/workspace.py b/dvc/repo/experiments/queue/workspace.py index 063f1c54fb..43388d3137 100644 --- a/dvc/repo/experiments/queue/workspace.py +++ b/dvc/repo/experiments/queue/workspace.py @@ -94,8 +94,6 @@ def reproduce(self) -> Dict[str, Dict[str, str]]: def _reproduce_entry( self, entry: QueueEntry, executor: "BaseExecutor" ) -> Dict[str, Dict[str, str]]: - from dvc.stage.monitor import CheckpointKilledError - results: Dict[str, Dict[str, str]] = defaultdict(dict) exec_name = self._EXEC_NAME or entry.stash_rev infofile = self.get_infofile_path(exec_name) @@ -114,9 +112,6 @@ def _reproduce_entry( results[rev].update( self.collect_executor(self.repo.experiments, executor, exec_result) ) - except CheckpointKilledError: - # Checkpoint errors have already been logged - return {} except DvcException: raise except Exception as exc: # noqa: BLE001 diff --git a/dvc/repo/experiments/refs.py b/dvc/repo/experiments/refs.py index dd9c4b2159..45ee388684 100644 --- a/dvc/repo/experiments/refs.py +++ b/dvc/repo/experiments/refs.py @@ -14,7 +14,6 @@ CELERY_FAILED_STASH = f"{EXPS_NAMESPACE}/celery/failed" EXEC_NAMESPACE = f"{EXPS_NAMESPACE}/exec" EXEC_APPLY = f"{EXEC_NAMESPACE}/EXEC_APPLY" -EXEC_CHECKPOINT = f"{EXEC_NAMESPACE}/EXEC_CHECKPOINT" EXEC_BRANCH = f"{EXEC_NAMESPACE}/EXEC_BRANCH" EXEC_BASELINE = f"{EXEC_NAMESPACE}/EXEC_BASELINE" EXEC_HEAD = f"{EXEC_NAMESPACE}/EXEC_HEAD" diff --git a/dvc/repo/experiments/run.py b/dvc/repo/experiments/run.py index 5975a375d1..a60c742755 100644 --- a/dvc/repo/experiments/run.py +++ b/dvc/repo/experiments/run.py @@ -68,9 +68,6 @@ def run( # noqa: C901 else: sweeps = [path_overrides] - if not kwargs.get("checkpoint_resume", None): - kwargs["reset"] = True - for idx, sweep_overrides in enumerate(sweeps): if hydra_sweep and name_prefix is not None: kwargs["name"] = f"{name_prefix}-{idx+1}" diff --git a/dvc/repo/experiments/show.py b/dvc/repo/experiments/show.py index 767fa56b42..ee1ad46322 100644 --- a/dvc/repo/experiments/show.py +++ b/dvc/repo/experiments/show.py @@ -153,17 +153,12 @@ def _collect_from_repo( def _collect_complete_experiment( repo: "Repo", - baseline: str, - exp_rev: str, running: Dict[str, Any], revs: List[str], **kwargs, ) -> Dict[str, Dict[str, Any]]: results: "OrderedDict[str, Dict[str, Any]]" = OrderedDict() - checkpoint = len(revs) > 1 - prev = "" - for rev in revs: status = ExpStatus.Running if rev in running else ExpStatus.Success collected_exp = collect_experiment_commit( @@ -175,24 +170,10 @@ def _collect_complete_experiment( ) if _is_scm_error(collected_exp): return {} - if checkpoint: - exp = {"checkpoint_tip": exp_rev} - if prev: - results[prev]["data"][ # type: ignore[unreachable] - "checkpoint_parent" - ] = rev - if rev in results: - results[rev]["data"].update(exp) - results.move_to_end(rev) - else: - exp.update(collected_exp["data"]) - else: - exp = collected_exp["data"] + exp = collected_exp["data"] if rev not in results: results[rev] = {"data": exp} - prev = rev - if checkpoint and prev: - results[prev]["data"]["checkpoint_parent"] = baseline + return results @@ -232,15 +213,13 @@ def _collect_branch( continue commits.append((ref, commit, exp_rev, revs)) - for exp_ref, _, exp_rev, revs in sorted( + for exp_ref, _, _, revs in sorted( commits, key=lambda x: x[1].commit_time, reverse=True ): ref_info = ExpRefInfo.from_ref(exp_ref) assert ref_info.baseline_sha == baseline collected_exp = _collect_complete_experiment( repo, - baseline=baseline, - exp_rev=exp_rev, running=running, revs=revs, **kwargs, @@ -387,39 +366,6 @@ def update_new( to_dict[baseline][rev] = to_dict[baseline].get(rev, experiment) -def move_properties_to_head(result: Dict[str, Dict[str, Dict[str, Any]]]): - for _, baseline_results in result.items(): - checkpoint: bool = False - head: Dict[str, Any] = {} - for rev, rev_data in baseline_results.items(): - if ( - "data" not in rev_data - or rev_data["data"].get("checkpoint_tip", None) is None - ): - checkpoint = False - head = {} - continue - - rev_result: Dict[str, Any] = rev_data["data"] - if ( - checkpoint is True - and rev_result["checkpoint_tip"] == head["checkpoint_tip"] - ): - if "name" in rev_result and "name" not in head: - head["name"] = rev_result["name"] - del rev_result["name"] - if rev_result["executor"]: - if not head["executor"]: - head["executor"] = rev_result["executor"] - rev_result["executor"] = None - if rev_result["status"] == ExpStatus.Running.name: - head["status"] = ExpStatus.Running.name - rev_result["status"] = ExpStatus.Success.name - elif rev_result["checkpoint_tip"] == rev: - head = rev_result - checkpoint = True - - def show( # noqa: PLR0913 repo: "Repo", all_branches=False, @@ -512,6 +458,4 @@ def show( # noqa: PLR0913 if not sha_only: update_names(repo, branch_names, res) - move_properties_to_head(res) - return res diff --git a/dvc/repo/experiments/stash.py b/dvc/repo/experiments/stash.py index 332701bdcf..1ac1cf86bb 100644 --- a/dvc/repo/experiments/stash.py +++ b/dvc/repo/experiments/stash.py @@ -21,7 +21,7 @@ class ExpStashEntry(NamedTuple): is not pushed onto the stash ref. head_rev: HEAD Git commit to be checked out for this experiment. baseline_rev: Experiment baseline commit. - branch: Optional exp (checkpoint) branch name for this experiment. + branch: Optional branch name for this experiment. name: Optional exp name. """ diff --git a/dvc/repo/experiments/utils.py b/dvc/repo/experiments/utils.py index afd1a1a5a2..bd7e5c160a 100644 --- a/dvc/repo/experiments/utils.py +++ b/dvc/repo/experiments/utils.py @@ -26,7 +26,6 @@ EXEC_APPLY, EXEC_BASELINE, EXEC_BRANCH, - EXEC_CHECKPOINT, EXPS_NAMESPACE, ITER_SKIP_NAMESPACES, STASHES, @@ -221,7 +220,6 @@ def exp_commits( def remove_exp_refs(scm: "Git", ref_infos: Iterable[ExpRefInfo]): exec_branch = scm.get_ref(EXEC_BRANCH, follow=False) exec_apply = scm.get_ref(EXEC_APPLY) - exec_checkpoint = scm.get_ref(EXEC_CHECKPOINT) for ref_info in ref_infos: ref = scm.get_ref(str(ref_info)) @@ -229,8 +227,6 @@ def remove_exp_refs(scm: "Git", ref_infos: Iterable[ExpRefInfo]): scm.remove_ref(EXEC_BRANCH) if exec_apply and exec_apply == ref: scm.remove_ref(EXEC_APPLY) - if exec_checkpoint and exec_checkpoint == ref: - scm.remove_ref(EXEC_CHECKPOINT) scm.remove_ref(str(ref_info)) diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index df5df0389d..a29def267b 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -1,10 +1,8 @@ import logging -from functools import partial from typing import TYPE_CHECKING, Iterator, List -from dvc.exceptions import DvcException, ReproductionError +from dvc.exceptions import ReproductionError from dvc.repo.scm_context import scm_context -from dvc.stage.exceptions import CheckpointKilledError from . import locked @@ -17,21 +15,6 @@ def _reproduce_stage(stage: "Stage", **kwargs) -> List["Stage"]: - def _run_callback(repro_callback): - stage.dump(update_pipeline=False) - _track_stage(stage) - repro_callback([stage]) - - checkpoint_func = kwargs.pop("checkpoint_func", None) - if stage.is_checkpoint: - if checkpoint_func: - kwargs["checkpoint_func"] = partial(_run_callback, checkpoint_func) - else: - raise DvcException( - "Checkpoint stages are not supported in 'dvc repro'. " - "Checkpoint stages must be reproduced with 'dvc exp run'. " - ) - if stage.frozen and not stage.is_import: logger.warning( "%s is frozen. Its dependencies are not going to be reproduced.", @@ -43,10 +26,8 @@ def _run_callback(repro_callback): return [] if not kwargs.get("dry", False): - track = checkpoint_func is not None stage.dump(update_pipeline=False) - if track: - _track_stage(stage) + _track_stage(stage) return [stage] @@ -173,17 +154,11 @@ def _reproduce_stages( # noqa: C901 unchanged: List["Stage"] = [] # `ret` is used to add a cosmetic newline. ret: List["Stage"] = [] - checkpoint_func = kwargs.pop("checkpoint_func", None) - for i, stage in enumerate(steps): + for stage in steps: if ret: logger.info("") - if checkpoint_func: - kwargs["checkpoint_func"] = partial( - _repro_callback, checkpoint_func, unchanged - ) - try: ret = _reproduce_stage(stage, **kwargs) @@ -199,21 +174,6 @@ def _reproduce_stages( # noqa: C901 if ret: result.extend(ret) - except CheckpointKilledError: - result.append(stage) - logger.warning( - ( - "Checkpoint stage '%s' was interrupted remaining stages in" - " pipeline will not be reproduced." - ), - stage.addressing, - ) - logger.warning( - "skipped stages '%s'", - ", ".join(s.addressing for s in steps[i + 1 :]), - ) - - break except Exception as exc: # noqa: BLE001 raise ReproductionError(stage.addressing) from exc diff --git a/dvc/schema.py b/dvc/schema.py index 5b65000b5e..d828fea567 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -56,7 +56,6 @@ **ANNOTATION_SCHEMA, # type: ignore[arg-type] Output.PARAM_CACHE: bool, Output.PARAM_PERSIST: bool, - Output.PARAM_CHECKPOINT: bool, Output.PARAM_REMOTE: str, Output.PARAM_PUSH: bool, } diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index a7ff66360e..a6f15ac575 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -277,14 +277,6 @@ def is_repo_import(self) -> bool: def is_versioned_import(self) -> bool: return self.is_import and self.deps[0].fs.version_aware - @property - def is_checkpoint(self) -> bool: - """ - A stage containing checkpoint outs is always considered as changed - since the checkpoint out is a circular dependency. - """ - return any(out.checkpoint for out in self.outs) - def short_description(self) -> Optional["str"]: desc: Optional["str"] = None if self.desc: @@ -294,36 +286,20 @@ def short_description(self) -> Optional["str"]: return line.strip() return desc - def _read_env(self, out, checkpoint_func=None) -> Env: - env: Env = {} - if out.checkpoint and checkpoint_func: - from dvc.env import DVC_CHECKPOINT - - env.update({DVC_CHECKPOINT: "1"}) - return env - - def env(self, checkpoint_func=None) -> Env: + def env(self) -> Env: from dvc.env import DVC_ROOT env: Env = {} if self.repo: env.update({DVC_ROOT: self.repo.root_dir}) - for out in self.outs: - current = self._read_env(out, checkpoint_func=checkpoint_func) - if any( - env.get(key) != current.get(key) - for key in set(env.keys()).intersection(current.keys()) - ): - raise DvcException("Conflicting values for env variable") - env.update(current) return env def changed_deps(self) -> bool: if self.frozen: return False - if self.is_callback or self.always_changed or self.is_checkpoint: + if self.is_callback or self.always_changed: return True return self._changed_deps() @@ -380,7 +356,7 @@ def changed(self) -> bool: def remove_outs(self, ignore_remove=False, force=False) -> None: """Used mainly for `dvc remove --outs` and :func:`Stage.reproduce`.""" for out in self.outs: - if (out.persist or out.checkpoint) and not force: + if out.persist and not force: out.unprotect() continue @@ -517,7 +493,7 @@ def save_outs(self, allow_missing: bool = False): try: out.save() except OutputDoesNotExistError: - if not (allow_missing or out.checkpoint): + if not allow_missing: raise if merge_versioned: old_out = old_outs.get(out.def_path) @@ -553,7 +529,7 @@ def commit(self, allow_missing=False, filter_info=None, **kwargs) -> None: try: out.commit(filter_info=filter_info, **kwargs) except OutputDoesNotExistError: - if not (allow_missing or out.checkpoint): + if not allow_missing: raise except CacheLinkError: link_failures.append(out.fs_path) @@ -584,7 +560,7 @@ def run( self._check_missing_outputs() if not dry: - if kwargs.get("checkpoint_func", None) or no_download: + if no_download: allow_missing = True no_cache_outs = any( @@ -631,7 +607,7 @@ def checkout( for out in self.filter_outs(kwargs.get("filter_info")): key, outs = self._checkout( out, - allow_missing=allow_missing or self.is_checkpoint, + allow_missing=allow_missing, **kwargs, ) if key: @@ -686,7 +662,7 @@ def _status_outs(self, ret, filter_info) -> None: ret.append({"changed outs": outs_status}) def _status_always_changed(self, ret) -> None: - if self.is_callback or self.always_changed or self.is_checkpoint: + if self.is_callback or self.always_changed: ret.append("always changed") def _status_stage(self, ret) -> None: diff --git a/dvc/stage/exceptions.py b/dvc/stage/exceptions.py index 5f9ac50d6d..d1c867c318 100644 --- a/dvc/stage/exceptions.py +++ b/dvc/stage/exceptions.py @@ -9,14 +9,6 @@ def __init__(self, cmd, status=None): super().__init__(msg) -class CheckpointKilledError(DvcException): - def __init__(self, cmd, status=None): - msg = f"failed to finish: {cmd}" - if status is not None: - msg += f", exited with {status}" - super().__init__(msg) - - class StageFileDoesNotExistError(DvcException): DVC_IGNORED = "is dvc-ignored" DOES_NOT_EXIST = "does not exist" diff --git a/dvc/stage/monitor.py b/dvc/stage/monitor.py deleted file mode 100644 index 4a62e034e6..0000000000 --- a/dvc/stage/monitor.py +++ /dev/null @@ -1,121 +0,0 @@ -import functools -import logging -import os -import subprocess -import threading -from dataclasses import dataclass -from typing import TYPE_CHECKING, Callable, List - -from dvc.stage.decorators import relock_repo -from dvc.stage.exceptions import CheckpointKilledError - -if TYPE_CHECKING: - from dvc.stage import Stage - - -logger = logging.getLogger(__name__) - - -@dataclass -class MonitorTask: - stage: "Stage" - execute: Callable - proc: subprocess.Popen - done: threading.Event = threading.Event() - updated: threading.Event = threading.Event() - - @property - def name(self) -> str: - raise NotImplementedError - - @property - def SIGNAL_FILE(self) -> str: # noqa: N802 - raise NotImplementedError - - @property - def error_cls(self) -> type: - raise NotImplementedError - - @property - def signal_path(self) -> str: - return os.path.join(self.stage.repo.tmp_dir, self.SIGNAL_FILE) - - def after_run(self): - pass - - -class CheckpointTask(MonitorTask): - name = "checkpoint" - SIGNAL_FILE = "DVC_CHECKPOINT" - error_cls = CheckpointKilledError - - def __init__(self, stage: "Stage", callback_func: Callable, proc: subprocess.Popen): - super().__init__( - stage, - functools.partial(CheckpointTask._run_callback, stage, callback_func), - proc, - ) - - @staticmethod - @relock_repo - def _run_callback(stage, callback_func): - stage.save(allow_missing=True) - stage.commit(allow_missing=True) - stage.unprotect_outs() - logger.debug("Running checkpoint callback for stage '%s'", stage) - callback_func() - - -class Monitor: - AWAIT: float = 1.0 - - def __init__(self, tasks: List[CheckpointTask]): - self.done = threading.Event() - self.tasks = tasks - self.monitor_thread = threading.Thread( - target=Monitor._loop, args=(self.tasks, self.done) - ) - - def __enter__(self): - self.monitor_thread.start() - - def __exit__(self, exc_type, exc_val, exc_tb): - self.done.set() - self.monitor_thread.join() - for t in self.tasks: - t.after_run() - - @staticmethod - def kill(proc): - if os.name == "nt": - return Monitor._kill_nt(proc) - proc.terminate() - proc.wait() - - @staticmethod - def _kill_nt(proc): - # windows stages are spawned with shell=True, proc is the shell process - # and not the actual stage process - we have to kill the entire tree - subprocess.call(["taskkill", "/F", "/T", "/PID", str(proc.pid)]) - - @staticmethod - def _loop(tasks: List[MonitorTask], done: threading.Event): - while True: - for task in tasks: - if os.path.exists(task.signal_path): - try: - task.execute() - task.updated.set() - except Exception: # pylint: disable=broad-except - logger.exception( - "Error running '%s' task, '%s' will be aborted", - task.name, - task.stage, - ) - Monitor.kill(task.proc) - finally: - logger.debug("Removing signal file for '%s' task", task.name) - os.remove(task.signal_path) - - if done.wait(Monitor.AWAIT): - return diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 18d9acfbc0..2ad979c5e2 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -3,17 +3,12 @@ import signal import subprocess import threading -from typing import TYPE_CHECKING, List -from dvc.stage.monitor import CheckpointTask, Monitor from dvc.utils import fix_env from .decorators import unlocked_repo from .exceptions import StageCmdFailedError -if TYPE_CHECKING: - from dvc.stage import Stage - logger = logging.getLogger(__name__) @@ -43,10 +38,9 @@ def _enforce_cmd_list(cmd): return cmd if isinstance(cmd, list) else cmd.splitlines() -def prepare_kwargs(stage, checkpoint_func=None, run_env=None): +def prepare_kwargs(stage, run_env=None): kwargs = {"cwd": stage.wdir, "env": fix_env(None), "close_fds": True} - kwargs["env"].update(stage.env(checkpoint_func=checkpoint_func)) if run_env: kwargs["env"].update(run_env) @@ -75,7 +69,7 @@ def get_executable(): return (os.getenv("SHELL") or "/bin/sh") if os.name != "nt" else None -def _run(stage: "Stage", executable, cmd, checkpoint_func, **kwargs): +def _run(executable, cmd, **kwargs): # pylint: disable=protected-access main_thread = isinstance( threading.current_thread(), @@ -90,36 +84,19 @@ def _run(stage: "Stage", executable, cmd, checkpoint_func, **kwargs): if main_thread: old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) - tasks = _get_monitor_tasks(stage, checkpoint_func, p) - - if tasks: - with Monitor(tasks): - p.communicate() - else: - p.communicate() + p.communicate() if p.returncode != 0: - for t in tasks: - if t.updated.is_set(): - raise t.error_cls(cmd, p.returncode) raise StageCmdFailedError(cmd, p.returncode) finally: if old_handler: signal.signal(signal.SIGINT, old_handler) -def _get_monitor_tasks(stage, checkpoint_func, proc) -> List[CheckpointTask]: - result = [] - if checkpoint_func: - result.append(CheckpointTask(stage, checkpoint_func, proc)) - - return result - - -def cmd_run(stage, dry=False, checkpoint_func=None, run_env=None): +def cmd_run(stage, dry=False, run_env=None): logger.info("Running stage '%s':", stage.addressing) commands = _enforce_cmd_list(stage.cmd) - kwargs = prepare_kwargs(stage, checkpoint_func=checkpoint_func, run_env=run_env) + kwargs = prepare_kwargs(stage, run_env=run_env) executable = get_executable() if not dry: @@ -130,13 +107,11 @@ def cmd_run(stage, dry=False, checkpoint_func=None, run_env=None): if dry: continue - _run(stage, executable, cmd, checkpoint_func=checkpoint_func, **kwargs) + _run(executable, cmd, **kwargs) -def run_stage( - stage, dry=False, force=False, checkpoint_func=None, run_env=None, **kwargs -): - if not (force or checkpoint_func): +def run_stage(stage, dry=False, force=False, run_env=None, **kwargs): + if not force: from .cache import RunCacheNotFoundError try: @@ -148,4 +123,4 @@ def run_stage( stage.save_deps() run = cmd_run if dry else unlocked_repo(cmd_run) - run(stage, dry=dry, checkpoint_func=checkpoint_func, run_env=run_env) + run(stage, dry=dry, run_env=run_env) diff --git a/dvc/stage/serialize.py b/dvc/stage/serialize.py index 8c6a7de2e9..0e16349351 100644 --- a/dvc/stage/serialize.py +++ b/dvc/stage/serialize.py @@ -34,7 +34,6 @@ PARAM_METRIC = Output.PARAM_METRIC PARAM_PLOT = Output.PARAM_PLOT PARAM_PERSIST = Output.PARAM_PERSIST -PARAM_CHECKPOINT = Output.PARAM_CHECKPOINT PARAM_DESC = Annotation.PARAM_DESC PARAM_REMOTE = Output.PARAM_REMOTE PARAM_PUSH = Output.PARAM_PUSH @@ -49,8 +48,6 @@ def _get_flags(out): if not out.use_cache: yield PARAM_CACHE, False - if out.checkpoint: - yield PARAM_CHECKPOINT, True if out.persist: yield PARAM_PERSIST, True if out.plot and isinstance(out.plot, dict): diff --git a/dvc/stage/utils.py b/dvc/stage/utils.py index 6d2e2ade46..568937f488 100644 --- a/dvc/stage/utils.py +++ b/dvc/stage/utils.py @@ -62,7 +62,6 @@ def fill_stage_outputs(stage, **kwargs): "plots_persist_no_cache", "outs_no_cache", "outs", - "checkpoints", ] stage.outs = [] @@ -75,7 +74,6 @@ def fill_stage_outputs(stage, **kwargs): persist="persist" in key, metric="metrics" in key, plot="plots" in key, - checkpoint="checkpoints" in key, ) @@ -169,7 +167,6 @@ def compute_md5(stage): stage.PARAM_FROZEN, Output.PARAM_METRIC, Output.PARAM_PERSIST, - Output.PARAM_CHECKPOINT, Meta.PARAM_ISEXEC, Meta.PARAM_SIZE, Meta.PARAM_NFILES, @@ -240,7 +237,6 @@ def prepare_file_path(kwargs) -> str: kwargs.get("plots_no_cache", []), kwargs.get("outs_persist", []), kwargs.get("outs_persist_no_cache", []), - kwargs.get("checkpoints", []), ) ) diff --git a/tests/func/experiments/conftest.py b/tests/func/experiments/conftest.py index 7358dc0989..df4cffb4de 100644 --- a/tests/func/experiments/conftest.py +++ b/tests/func/experiments/conftest.py @@ -1,9 +1,7 @@ import pytest from tests.unit.repo.experiments.conftest import ( # noqa, pylint disable=unused-argument - checkpoint_stage, exp_stage, - failed_checkpoint_stage, failed_exp_stage, session_app, session_queue, diff --git a/tests/func/experiments/test_checkpoints.py b/tests/func/experiments/test_checkpoints.py deleted file mode 100644 index 302ffbfbe3..0000000000 --- a/tests/func/experiments/test_checkpoints.py +++ /dev/null @@ -1,272 +0,0 @@ -import logging - -import pytest -from funcy import first - -from dvc.config import NoRemoteError -from dvc.env import DVC_EXP_AUTO_PUSH, DVC_EXP_GIT_REMOTE -from dvc.exceptions import DvcException -from dvc.repo.experiments.exceptions import MultipleBranchError -from dvc.repo.experiments.executor.base import BaseExecutor -from dvc.repo.experiments.refs import EXEC_APPLY, EXEC_CHECKPOINT -from dvc.repo.experiments.utils import exp_refs_by_rev -from dvc.scm import InvalidRemoteSCMRepo, RevError - - -@pytest.mark.parametrize("links", ["reflink,copy", "hardlink,symlink"]) -def test_new_checkpoint(tmp_dir, scm, dvc, checkpoint_stage, mocker, workspace, links): - with dvc.config.edit() as conf: - conf["cache"]["type"] = links - - new_mock = mocker.spy(dvc.experiments, "new") - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - exp = first(results) - - new_mock.assert_called_once() - with dvc.switch(exp): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 2" - - if workspace: - assert scm.get_ref(EXEC_APPLY) == exp - assert scm.get_ref(EXEC_CHECKPOINT) == exp - if workspace: - assert (tmp_dir / "foo").read_text().strip() == str(checkpoint_stage.iterations) - assert (tmp_dir / "metrics.yaml").read_text().strip() == "foo: 2" - - -@pytest.mark.parametrize("checkpoint_resume", [None, "foo"]) -def test_resume_checkpoint( - tmp_dir, scm, dvc, checkpoint_stage, checkpoint_resume, workspace -): - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - - with pytest.raises(RevError, match="unknown Git revision 'abc1234'"): - dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume="abc1234", - tmp_dir=not workspace, - ) - - if checkpoint_resume: - checkpoint_resume = first(results) - - if not workspace: - dvc.experiments.apply(first(results)) - results = dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume=checkpoint_resume, - tmp_dir=not workspace, - ) - exp = first(results) - - with dvc.switch(exp): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 2" - - if workspace: - assert scm.get_ref(EXEC_APPLY) == exp - assert scm.get_ref(EXEC_CHECKPOINT) == exp - - -def test_reset_checkpoint(tmp_dir, scm, dvc, checkpoint_stage, caplog, workspace): - dvc.experiments.run(checkpoint_stage.addressing, tmp_dir=not workspace) - - results = dvc.experiments.run( - checkpoint_stage.addressing, - params=["foo=2"], - tmp_dir=not workspace, - reset=True, - ) - exp = first(results) - - with dvc.switch(exp): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 2" - - if workspace: - assert scm.get_ref(EXEC_APPLY) == exp - assert scm.get_ref(EXEC_CHECKPOINT) == exp - - -def test_resume_branch(tmp_dir, scm, dvc, checkpoint_stage, workspace): - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - branch_rev = first(results) - if not workspace: - dvc.experiments.apply(branch_rev) - - results = dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume=branch_rev, - tmp_dir=not workspace, - ) - checkpoint_a = first(results) - - dvc.experiments.apply(branch_rev) - results = dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume=branch_rev, - params=["foo=100"], - tmp_dir=not workspace, - ) - checkpoint_b = first(results) - - with dvc.switch(checkpoint_a): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 2" - - with dvc.switch(checkpoint_b): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 100" - - with pytest.raises(MultipleBranchError): - dvc.experiments.get_branch_by_rev(branch_rev) - - assert branch_rev == dvc.experiments.scm.gitpython.repo.git.merge_base( - checkpoint_a, checkpoint_b - ) - - -def test_resume_non_head_checkpoint(tmp_dir, scm, dvc, checkpoint_stage, workspace): - orig_head = scm.get_rev() - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - checkpoint_head = first(results) - orig_branch = dvc.experiments.get_branch_by_rev(checkpoint_head) - - rev = list(scm.branch_revs(checkpoint_head, orig_head))[-1] - dvc.experiments.apply(rev) - - with pytest.raises(DvcException, match="Nothing to do for unchanged checkpoint"): - dvc.experiments.run(checkpoint_stage.addressing, tmp_dir=not workspace) - - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=100"], tmp_dir=not workspace - ) - new_head = first(results) - assert orig_branch != dvc.experiments.get_branch_by_rev(new_head) - - -@pytest.fixture -def clear_env(monkeypatch): - yield - monkeypatch.delenv(DVC_EXP_GIT_REMOTE, raising=False) - monkeypatch.delenv(DVC_EXP_AUTO_PUSH, raising=False) - - -@pytest.mark.parametrize("use_url", [True, False]) -def test_auto_push_during_iterations( - tmp_dir, - scm, - dvc, - checkpoint_stage, - git_upstream, - local_remote, - use_url, - monkeypatch, - mocker, - clear_env, -): - # set up remote repo - remote = git_upstream.url if use_url else git_upstream.remote - git_upstream.tmp_dir.scm.fetch_refspecs(str(tmp_dir), ["master:master"]) - monkeypatch.setenv(DVC_EXP_GIT_REMOTE, remote) - auto_push_spy = mocker.spy(BaseExecutor, "_auto_push") - - # without auto push - results = dvc.experiments.run(checkpoint_stage.addressing) - assert auto_push_spy.call_count == 0 - - # add auto push - monkeypatch.setenv(DVC_EXP_AUTO_PUSH, "true") - results = dvc.experiments.run(checkpoint_stage.addressing) - assert (tmp_dir / "foo").read_text() == "4" - exp = first(results) - ref_info = first(exp_refs_by_rev(scm, exp)) - assert git_upstream.tmp_dir.scm.get_ref(str(ref_info)) == exp - - # Assert 3 pushes: 2 checkpoints and final commit - assert auto_push_spy.call_count == 3 - assert auto_push_spy.call_args[0][2] == remote - - -def test_auto_push_error_url( - dvc, scm, checkpoint_stage, local_remote, monkeypatch, clear_env -): - monkeypatch.setenv(DVC_EXP_GIT_REMOTE, "true") - monkeypatch.setenv(DVC_EXP_AUTO_PUSH, "true") - with pytest.raises(InvalidRemoteSCMRepo): - dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - - -def test_auto_push_no_remote( - dvc, scm, checkpoint_stage, git_upstream, monkeypatch, clear_env -): - monkeypatch.setenv(DVC_EXP_GIT_REMOTE, git_upstream.url) - monkeypatch.setenv(DVC_EXP_AUTO_PUSH, "true") - with pytest.raises(NoRemoteError): - dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - - -def test_auto_push_self_remote( - tmp_dir, - dvc, - scm, - checkpoint_stage, - local_remote, - caplog, - monkeypatch, - clear_env, -): - root_dir = str(tmp_dir) - monkeypatch.setenv(DVC_EXP_GIT_REMOTE, root_dir) - monkeypatch.setenv(DVC_EXP_AUTO_PUSH, "true") - assert dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) != {} - - with caplog.at_level(logging.WARNING, logger="dvc.repo.experiments"): - assert ( - f"'{root_dir}' points to the current Git repo, experiment " - "Git refs will not be pushed. But DVC cache and run cache will " - "automatically be pushed to the default DVC remote (if any) " - "on each experiment commit." in caplog.text - ) - - -def test_tmp_dir_failed_checkpoint(tmp_dir, scm, dvc, failed_checkpoint_stage, caplog): - baseline = scm.get_rev() - - results = dvc.experiments.run( - failed_checkpoint_stage.addressing, params=["foo=2"], tmp_dir=True - ) - exp = first(results) - - result = dvc.experiments.show()[baseline] - # Assert 4 rows: baseline, 2 checkpoints, and final commit - assert len(result) == 4 - assert exp in result - assert ( - "Checkpoint stage 'failed-checkpoint-file' was interrupted remaining " - "stages in pipeline will not be reproduced." in caplog.text - ) diff --git a/tests/func/experiments/test_remote.py b/tests/func/experiments/test_remote.py index 5251cb71cc..8b49c95325 100644 --- a/tests/func/experiments/test_remote.py +++ b/tests/func/experiments/test_remote.py @@ -1,5 +1,3 @@ -import os - import pytest from funcy import first @@ -83,24 +81,6 @@ def test_push_diverged(tmp_dir, scm, dvc, git_upstream, exp_stage): assert git_upstream.tmp_dir.scm.get_ref(str(ref_info)) == exp -def test_push_checkpoint(tmp_dir, scm, dvc, git_upstream, checkpoint_stage): - results = dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - exp_a = first(results) - ref_info_a = first(exp_refs_by_rev(scm, exp_a)) - - dvc.experiments.push(git_upstream.remote, [ref_info_a.name], force=True) - assert git_upstream.tmp_dir.scm.get_ref(str(ref_info_a)) == exp_a - - results = dvc.experiments.run(checkpoint_stage.addressing, checkpoint_resume=exp_a) - exp_b = first(results) - ref_info_b = first(exp_refs_by_rev(scm, exp_b)) - - tmp_dir.scm_gen("new", "new", commit="new") - - dvc.experiments.push(git_upstream.remote, [ref_info_b.name], force=True) - assert git_upstream.tmp_dir.scm.get_ref(str(ref_info_b)) == exp_b - - def test_push_ambiguous_name(tmp_dir, scm, dvc, git_upstream, exp_stage): from dvc.exceptions import InvalidArgumentError @@ -242,23 +222,6 @@ def test_pull_diverged(tmp_dir, scm, dvc, git_downstream, exp_stage): assert git_downstream.tmp_dir.scm.get_ref(str(ref_info)) == exp -def test_pull_checkpoint(tmp_dir, scm, dvc, git_downstream, checkpoint_stage): - results = dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - exp_a = first(results) - ref_info_a = first(exp_refs_by_rev(scm, exp_a)) - - downstream_exp = git_downstream.tmp_dir.dvc.experiments - downstream_exp.pull(git_downstream.remote, [ref_info_a.name], force=True) - assert git_downstream.tmp_dir.scm.get_ref(str(ref_info_a)) == exp_a - - results = dvc.experiments.run(checkpoint_stage.addressing, checkpoint_resume=exp_a) - exp_b = first(results) - ref_info_b = first(exp_refs_by_rev(scm, exp_b)) - - downstream_exp.pull(git_downstream.remote, [ref_info_b.name], force=True) - assert git_downstream.tmp_dir.scm.get_ref(str(ref_info_b)) == exp_b - - def test_pull_ambiguous_name(tmp_dir, scm, dvc, git_downstream, exp_stage): from dvc.exceptions import InvalidArgumentError @@ -284,36 +247,6 @@ def test_pull_ambiguous_name(tmp_dir, scm, dvc, git_downstream, exp_stage): assert git_downstream.tmp_dir.scm.get_ref(str(ref_info_a)) == exp_a -def test_push_pull_cache( - tmp_dir, scm, dvc, git_upstream, checkpoint_stage, local_remote -): - from dvc.utils.fs import remove - from tests.func.test_diff import digest - - remote = git_upstream.remote - results = dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - exp = first(results) - ref_info = first(exp_refs_by_rev(scm, exp)) - - dvc.experiments.push(remote, [ref_info.name], push_cache=True) - for x in range(2, checkpoint_stage.iterations + 1): - hash_ = digest(str(x)) - path = os.path.join(local_remote.url, hash_[:2], hash_[2:]) - assert os.path.exists(path) - with open(path, encoding="utf-8") as f: - assert f.read() == str(x) - - remove(dvc.cache.local.path) - - dvc.experiments.pull(remote, [ref_info.name], pull_cache=True) - for x in range(2, checkpoint_stage.iterations + 1): - hash_ = digest(str(x)) - path = os.path.join(dvc.cache.local.path, hash_[:2], hash_[2:]) - assert os.path.exists(path) - with open(path, encoding="utf-8") as f: - assert f.read() == str(x) - - def test_auth_error_list(tmp_dir, scm, dvc, http_auth_patch): from dvc.scm import GitAuthError diff --git a/tests/func/experiments/test_set_params.py b/tests/func/experiments/test_set_params.py index d708d6f268..d46662d8db 100644 --- a/tests/func/experiments/test_set_params.py +++ b/tests/func/experiments/test_set_params.py @@ -118,7 +118,6 @@ def test_hydra_sweep( patched.assert_any_call( mocker.ANY, params=e, - reset=True, targets=None, ) diff --git a/tests/func/experiments/test_show.py b/tests/func/experiments/test_show.py index 7c98fa957e..175d83c396 100644 --- a/tests/func/experiments/test_show.py +++ b/tests/func/experiments/test_show.py @@ -1,6 +1,5 @@ import logging import os -import random from datetime import datetime from unittest.mock import ANY @@ -11,7 +10,7 @@ from dvc.cli import main from dvc.repo.experiments.executor.base import BaseExecutor, ExecutorInfo, TaskStatus from dvc.repo.experiments.queue.base import QueueEntry -from dvc.repo.experiments.refs import CELERY_STASH, ExpRefInfo +from dvc.repo.experiments.refs import CELERY_STASH from dvc.repo.experiments.show import _CachedError from dvc.repo.experiments.utils import EXEC_PID_DIR, EXEC_TMP_DIR, exp_refs_by_rev from dvc.utils import relpath @@ -217,81 +216,6 @@ def test_show_failed_experiment(tmp_dir, scm, dvc, failed_exp_stage, test_queue) assert exp == expected_failed -@pytest.mark.vscode -@pytest.mark.parametrize("workspace", [True, False]) -def test_show_checkpoint(tmp_dir, scm, dvc, checkpoint_stage, capsys, workspace): - baseline_rev = scm.get_rev() - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - exp_rev = first(results) - - results = dvc.experiments.show()[baseline_rev] - # Assert 4 rows: baseline, 2 checkpoints, and final commit - assert len(results) == checkpoint_stage.iterations + 2 - - checkpoints = [] - for rev, exp in results.items(): - if rev != "baseline": - checkpoints.append(rev) - assert exp["data"]["checkpoint_tip"] == exp_rev - - capsys.readouterr() - assert main(["exp", "show", "--no-pager"]) == 0 - cap = capsys.readouterr() - - for i, rev in enumerate(checkpoints): - if i == 0: - name = dvc.experiments.get_exact_name([rev])[rev] - name = f"{rev[:7]} [{name}]" - fs = "╓" - elif i == len(checkpoints) - 1: - name = rev[:7] - fs = "╨" - else: - name = rev[:7] - fs = "╟" - assert f"{fs} {name}" in cap.out - - -@pytest.mark.vscode -@pytest.mark.parametrize("workspace", [True, False]) -def test_show_checkpoint_branch(tmp_dir, scm, dvc, checkpoint_stage, capsys, workspace): - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - branch_rev = first(results) - if not workspace: - dvc.experiments.apply(branch_rev) - - results = dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume=branch_rev, - tmp_dir=not workspace, - ) - checkpoint_a = first(results) - - dvc.experiments.apply(branch_rev) - results = dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume=branch_rev, - params=["foo=100"], - tmp_dir=not workspace, - ) - checkpoint_b = first(results) - - capsys.readouterr() - assert main(["exp", "show", "--no-pager"]) == 0 - cap = capsys.readouterr() - - for rev in (checkpoint_a, checkpoint_b): - ref = dvc.experiments.get_branch_by_rev(rev) - ref_info = ExpRefInfo.from_ref(ref) - name = f"{rev[:7]} [{ref_info.name}]" - assert f"╓ {name}" in cap.out - assert f"({branch_rev[:7]})" in cap.out - - def test_show_filter( tmp_dir, scm, @@ -545,54 +469,6 @@ def test_show_running_celery(tmp_dir, scm, dvc, exp_stage, mocker): assert results["workspace"]["baseline"]["data"]["status"] == "Success" -def test_show_running_checkpoint( - tmp_dir, scm, dvc, checkpoint_stage, mocker, test_queue -): - from dvc.repo.experiments.executor.local import TempDirExecutor - - baseline_rev = scm.get_rev() - dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], queue=True, name="foo" - ) - queue = dvc.experiments.celery_queue - entries = list(queue.iter_queued()) - - run_results = dvc.experiments.run(run_all=True) - test_queue.wait(["foo"]) - checkpoint_rev = first(run_results) - exp_ref = first(exp_refs_by_rev(scm, checkpoint_rev)) - - mocker.patch.object( - dvc.experiments.celery_queue, - "iter_active", - return_value=entries, - ) - mocker.patch.object( - dvc.experiments.celery_queue, - "iter_failed", - return_value=[], - ) - pidfile = queue.get_infofile_path(entries[0].stash_rev) - info = make_executor_info( - git_url="foo.git", - baseline_rev=baseline_rev, - location=TempDirExecutor.DEFAULT_LOCATION, - status=TaskStatus.RUNNING, - ) - os.makedirs(os.path.dirname(pidfile), exist_ok=True) - (tmp_dir / pidfile).dump_json(info.asdict()) - - mocker.patch.object(BaseExecutor, "fetch_exps", return_value=[str(exp_ref)]) - - results = dvc.experiments.show() - - checkpoint_res = get_in(results, [baseline_rev, checkpoint_rev, "data"]) - assert checkpoint_res["status"] == "Running" - assert checkpoint_res["executor"] == info.location - - assert results["workspace"]["baseline"]["data"]["status"] == "Success" - - def test_show_with_broken_repo(tmp_dir, scm, dvc, exp_stage, caplog): baseline_rev = scm.get_rev() exp1 = dvc.experiments.run(exp_stage.addressing, params=["foo=2"]) @@ -1015,40 +891,6 @@ def resolve_commit(rev): assert exp_rev_2 in experiments -@pytest.mark.vscode -def test_show_checkpoint_error(tmp_dir, scm, dvc, checkpoint_stage, mocker): - baseline_rev = scm.get_rev() - results = dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - exp_rev = first(results) - exp_ref = str(first(exp_refs_by_rev(scm, exp_rev))) - - results = dvc.experiments.show()[baseline_rev] - # Assert 4 rows: baseline, 2 checkpoints, and final commit - assert len(results) == checkpoint_stage.iterations + 2 - - checkpoints = {} - for rev in results: - if rev != "baseline": - checkpoints[rev] = scm.resolve_commit(rev) - checkpoints[exp_ref] = scm.resolve_commit(exp_ref) - checkpoints[baseline_rev] = scm.resolve_commit(baseline_rev) - - failed_rev = random.choice(list(checkpoints.keys())) - - def resolve_commit(rev): - if rev == failed_rev: - raise SCMError - return checkpoints[rev] - - mocker.patch.object( - scm, - "resolve_commit", - side_effect=mocker.MagicMock(side_effect=resolve_commit), - ) - results = dvc.experiments.show(force=True)[baseline_rev] - assert len(results) == 1 - - @pytest.mark.vscode def test_show_baseline_error(tmp_dir, scm, dvc, exp_stage, mocker): baseline_rev = scm.get_rev() diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index df93a54961..716f4c444b 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -1222,4 +1222,4 @@ def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy, mocker): assert dvc.status([stage.addressing]) == {stage.addressing: ["changed checksum"]} assert dvc.reproduce(stage.addressing)[0] == stage - m.assert_called_once_with(stage, checkpoint_func=None, dry=False, run_env=None) + m.assert_called_once_with(stage, dry=False, run_env=None) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index d190a3b79e..d1ffd7e82e 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -135,7 +135,7 @@ def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy, mocker): assert dvc.status([target]) == {target: ["changed command"]} assert dvc.reproduce(target)[0] == stage - m.assert_called_once_with(stage, checkpoint_func=None, dry=False, run_env=None) + m.assert_called_once_with(stage, dry=False, run_env=None) def test_repro_when_new_deps_is_added_in_dvcfile(tmp_dir, dvc, run_copy, copy_script): diff --git a/tests/func/test_stage.py b/tests/func/test_stage.py index 242155d321..1fff601967 100644 --- a/tests/func/test_stage.py +++ b/tests/func/test_stage.py @@ -7,9 +7,8 @@ from dvc.exceptions import OutputDuplicationError from dvc.fs import LocalFileSystem from dvc.output import Output -from dvc.repo import Repo, lock_repo +from dvc.repo import Repo from dvc.stage import PipelineStage, Stage -from dvc.stage.run import run_stage from dvc.stage.utils import compute_md5 from dvc.utils import dict_md5 from dvc.utils.serialize import dump_yaml, load_yaml @@ -318,24 +317,6 @@ def test_stage_remove_pointer_stage(tmp_dir, dvc, run_copy): assert not (tmp_dir / stage.relpath).exists() -@pytest.mark.parametrize("checkpoint", [True, False]) -def test_stage_run_checkpoint(tmp_dir, dvc, mocker, checkpoint): - stage = Stage(dvc, "stage.dvc", cmd="mycmd arg1 arg2") - mocker.patch.object(stage, "save") - - mock_cmd_run = mocker.patch("dvc.stage.run.cmd_run") - if checkpoint: - callback = mocker.Mock() - else: - callback = None - - with lock_repo(dvc): - run_stage(stage, checkpoint_func=callback) - mock_cmd_run.assert_called_with( - stage, checkpoint_func=callback, dry=False, run_env=None - ) - - def test_stage_add_duplicated_output(tmp_dir, dvc): tmp_dir.dvc_gen("foo", "foo") dvc.add("foo") diff --git a/tests/unit/command/test_experiments.py b/tests/unit/command/test_experiments.py index 3d4cb33983..a3871b4048 100644 --- a/tests/unit/command/test_experiments.py +++ b/tests/unit/command/test_experiments.py @@ -130,8 +130,6 @@ def test_experiments_run(dvc, scm, mocker): "run_all": False, "jobs": 1, "tmp_dir": False, - "checkpoint_resume": None, - "reset": False, "machine": None, } default_arguments.update(repro_arguments) diff --git a/tests/unit/command/test_run.py b/tests/unit/command/test_run.py index d328dedd14..b9fea4bd26 100644 --- a/tests/unit/command/test_run.py +++ b/tests/unit/command/test_run.py @@ -35,8 +35,6 @@ def test_run(mocker, dvc): "outs-persist", "--outs-persist-no-cache", "outs-persist-no-cache", - "--checkpoints", - "checkpoints", "--always-changed", "--params", "file:param1,param2", @@ -66,7 +64,6 @@ def test_run(mocker, dvc): plots_no_cache=["plots-no-cache"], outs_persist=["outs-persist"], outs_persist_no_cache=["outs-persist-no-cache"], - checkpoints=["checkpoints"], params=["file:param1,param2", "param3"], fname="file", wdir="wdir", @@ -99,7 +96,6 @@ def test_run_args_from_cli(mocker, dvc): plots_no_cache=[], outs_persist=[], outs_persist_no_cache=[], - checkpoints=[], params=[], fname=None, wdir=None, @@ -132,7 +128,6 @@ def test_run_args_with_spaces(mocker, dvc): plots_no_cache=[], outs_persist=[], outs_persist_no_cache=[], - checkpoints=[], params=[], fname=None, wdir=None, diff --git a/tests/unit/command/test_stage.py b/tests/unit/command/test_stage.py index 32eb9aef3d..573a942b21 100644 --- a/tests/unit/command/test_stage.py +++ b/tests/unit/command/test_stage.py @@ -42,8 +42,6 @@ def test_stage_add(mocker, dvc, command, parsed_command): "outs-persist", "--outs-persist-no-cache", "outs-persist-no-cache", - "--checkpoints", - "checkpoints", "--always-changed", "--params", "file:param1,param2", @@ -79,7 +77,6 @@ def test_stage_add(mocker, dvc, command, parsed_command): wdir="wdir", outs_persist=["outs-persist"], outs_persist_no_cache=["outs-persist-no-cache"], - checkpoints=["checkpoints"], always_changed=True, external=True, desc="description", diff --git a/tests/unit/repo/experiments/conftest.py b/tests/unit/repo/experiments/conftest.py index af0a5439ad..65b84f366e 100644 --- a/tests/unit/repo/experiments/conftest.py +++ b/tests/unit/repo/experiments/conftest.py @@ -1,53 +1,10 @@ from functools import partial -from textwrap import dedent import pytest from dvc_task.app import FSApp DEFAULT_ITERATIONS = 2 -CHECKPOINT_SCRIPT_FORMAT = dedent( - """\ - import os - import sys - import shutil - - from dvc.api import make_checkpoint - - checkpoint_file = {} - checkpoint_iterations = int({}) - if os.path.exists(checkpoint_file): - with open(checkpoint_file) as fobj: - try: - value = int(fobj.read()) - except ValueError: - value = 0 - else: - with open(checkpoint_file, "w"): - pass - value = 0 - - shutil.copyfile({}, {}) - - if os.getenv("DVC_CHECKPOINT"): - for index in range(checkpoint_iterations): - value += 1 - {} - with open(checkpoint_file, "w") as fobj: - fobj.write(str(value)) - make_checkpoint() -""" -) -CHECKPOINT_SCRIPT = CHECKPOINT_SCRIPT_FORMAT.format( - "sys.argv[1]", "sys.argv[2]", "sys.argv[3]", "sys.argv[4]", "" -) -FAILED_CHECKPOINT_SCRIPT = CHECKPOINT_SCRIPT_FORMAT.format( - "sys.argv[1]", - "sys.argv[2]", - "sys.argv[3]", - "sys.argv[4]", - "if index == (checkpoint_iterations - 2): raise Exception", -) @pytest.fixture @@ -74,48 +31,6 @@ def exp_stage(tmp_dir, scm, dvc, copy_script): return stage -@pytest.fixture -def checkpoint_stage(tmp_dir, scm, dvc, mocker): - mocker.patch("dvc.stage.run.Monitor.AWAIT", 0.01) - - tmp_dir.gen("checkpoint.py", CHECKPOINT_SCRIPT) - tmp_dir.gen("params.yaml", "foo: 1") - stage = dvc.run( - cmd=f"python checkpoint.py foo {DEFAULT_ITERATIONS} params.yaml metrics.yaml", - metrics_no_cache=["metrics.yaml"], - params=["foo"], - checkpoints=["foo"], - deps=["checkpoint.py"], - no_exec=True, - name="checkpoint-file", - ) - scm.add(["dvc.yaml", "checkpoint.py", "params.yaml", ".gitignore"]) - scm.commit("init") - stage.iterations = DEFAULT_ITERATIONS - return stage - - -@pytest.fixture -def failed_checkpoint_stage(tmp_dir, scm, dvc, mocker): - mocker.patch("dvc.stage.run.Monitor.AWAIT", 0.01) - - tmp_dir.gen("checkpoint.py", FAILED_CHECKPOINT_SCRIPT) - tmp_dir.gen("params.yaml", "foo: 1") - stage = dvc.run( - cmd=f"python checkpoint.py foo {DEFAULT_ITERATIONS+2} params.yaml metrics.yaml", - metrics_no_cache=["metrics.yaml"], - params=["foo"], - checkpoints=["foo"], - deps=["checkpoint.py"], - no_exec=True, - name="failed-checkpoint-file", - ) - scm.add(["dvc.yaml", "checkpoint.py", "params.yaml", ".gitignore"]) - scm.commit("init") - stage.iterations = DEFAULT_ITERATIONS - return stage - - @pytest.fixture def failed_exp_stage(tmp_dir, scm, dvc, copy_script): tmp_dir.gen("params.yaml", "foo: 1") diff --git a/tests/unit/repo/experiments/test_show.py b/tests/unit/repo/experiments/test_show.py index 2293e043bb..9e7660ae2a 100644 --- a/tests/unit/repo/experiments/test_show.py +++ b/tests/unit/repo/experiments/test_show.py @@ -1,12 +1,6 @@ -import pytest from funcy import first -from dvc.repo.experiments.show import ( - ExpStatus, - get_branch_names, - move_properties_to_head, - update_names, -) +from dvc.repo.experiments.show import get_branch_names, update_names def test_get_show_branch(tmp_dir, scm, dvc, exp_stage): @@ -33,185 +27,3 @@ def test_get_show_branch(tmp_dir, scm, dvc, exp_stage): assert result[exp_a]["baseline"]["data"] == {"name": new_branch} assert result[baseline]["baseline"]["data"] == {"name": base_branch} assert result[baseline][exp_a]["data"] == {"name": new_branch} - - -@pytest.mark.parametrize( - "res_in,res_out", - [ - ( - { - "1": { - "data": { - "checkpoint_tip": "1", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - "2": { - "data": { - "checkpoint_tip": "1", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "a", - } - }, - "3": { - "data": { - "checkpoint_tip": "3", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - "4": { - "data": { - "checkpoint_tip": "3", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "b", - } - }, - }, - { - "1": { - "data": { - "checkpoint_tip": "1", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "a", - } - }, - "2": { - "data": { - "checkpoint_tip": "1", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - "3": { - "data": { - "checkpoint_tip": "3", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "b", - } - }, - "4": { - "data": { - "checkpoint_tip": "3", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - }, - ), - ( - { - "0": {"error": {"msg": "something"}}, - "1": { - "data": { - "checkpoint_tip": "1", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - "2": { - "data": { - "checkpoint_tip": "1", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "a", - } - }, - "3": {"data": {"name": "b"}}, - }, - { - "0": {"error": {"msg": "something"}}, - "1": { - "data": { - "checkpoint_tip": "1", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "a", - } - }, - "2": { - "data": { - "checkpoint_tip": "1", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - "3": {"data": {"name": "b"}}, - }, - ), - ( - { - "1": { - "data": { - "checkpoint_tip": "1", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - "2": { - "data": { - "checkpoint_tip": "1", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "a", - } - }, - "3": { - "data": { - "checkpoint_tip": "1", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "b", - } - }, - "4": { - "data": { - "checkpoint_tip": "1", - "executor": "local", - "status": ExpStatus.Running.name, - } - }, - }, - { - "1": { - "data": { - "checkpoint_tip": "1", - "executor": "local", - "status": ExpStatus.Running.name, - "name": "a", - } - }, - "2": { - "data": { - "checkpoint_tip": "1", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - "3": { - "data": { - "checkpoint_tip": "1", - "executor": None, - "status": ExpStatus.Success.name, - "name": "b", - } - }, - "4": { - "data": { - "checkpoint_tip": "1", - "executor": None, - "status": ExpStatus.Success.name, - } - }, - }, - ), - ], -) -def test_move_properties_to_head(res_in, res_out): - move_properties_to_head({"baseline": res_in}) - assert res_in == res_out diff --git a/tests/unit/stage/test_stage.py b/tests/unit/stage/test_stage.py index 739527ae95..10f9f44e0e 100644 --- a/tests/unit/stage/test_stage.py +++ b/tests/unit/stage/test_stage.py @@ -121,22 +121,3 @@ def test_external_outs(tmp_path_factory, dvc): create_stage(Stage, dvc, "path.dvc", outs=["remote://myremote/foo"]) create_stage(Stage, dvc, "path.dvc", outs=[os.fspath(foo)], external=True) - - -def test_env(dvc, mocker): - from dvc.env import DVC_ROOT - from dvc.exceptions import DvcException - from dvc.stage import create_stage - - stage = create_stage(Stage, dvc, "path.dvc", outs=["foo", "bar"]) - - mocker.patch.object(stage, "_read_env", return_value={"foo": "foo"}) - env = stage.env() - assert env == {DVC_ROOT: dvc.root_dir, "foo": "foo"} - - def mock_read_env(out, **kwargs): - return {"foo": str(out)} - - mocker.patch.object(stage, "_read_env", mock_read_env) - with pytest.raises(DvcException, match="Conflicting values for env variable"): - _ = stage.env()