diff --git a/dvc/api/__init__.py b/dvc/api/__init__.py index dc34f5c4626..b3ba6bebe08 100644 --- a/dvc/api/__init__.py +++ b/dvc/api/__init__.py @@ -2,7 +2,7 @@ from .data import open # pylint: disable=redefined-builtin from .data import get_url, read -from .experiments import exp_save, exp_show, make_checkpoint +from .experiments import exp_save, exp_show from .scm import all_branches, all_commits, all_tags from .show import metrics_show, params_show @@ -13,7 +13,6 @@ "exp_save", "exp_show", "get_url", - "make_checkpoint", "open", "params_show", "metrics_show", diff --git a/dvc/api/experiments.py b/dvc/api/experiments.py index 2032539cd4a..ba9ff750ad9 100644 --- a/dvc/api/experiments.py +++ b/dvc/api/experiments.py @@ -1,40 +1,9 @@ -import builtins -import os -from time import sleep from typing import Dict, List, Optional, Union from rich.text import Text -from dvc.env import DVC_CHECKPOINT, DVC_ROOT from dvc.repo import Repo from dvc.repo.experiments.show import tabulate -from dvc.stage.monitor import CheckpointTask - - -def make_checkpoint(): - """ - Signal DVC to create a checkpoint experiment. - - If the current process is being run from DVC, this function will block - until DVC has finished creating the checkpoint. Otherwise, this function - will return immediately. - """ - if os.getenv(DVC_CHECKPOINT) is None: - return - - root_dir = os.getenv(DVC_ROOT, Repo.find_root()) - signal_file = os.path.join( - root_dir, Repo.DVC_DIR, "tmp", CheckpointTask.SIGNAL_FILE - ) - - with builtins.open(signal_file, "w", encoding="utf-8") as fobj: - # NOTE: force flushing/writing empty file to disk, otherwise when - # run in certain contexts (pytest) file may not actually be written - fobj.write("") - fobj.flush() - os.fsync(fobj.fileno()) - while os.path.exists(signal_file): - sleep(0.1) def exp_save( diff --git a/dvc/commands/experiments/run.py b/dvc/commands/experiments/run.py index 8cc1121abea..f44f5041385 100644 --- a/dvc/commands/experiments/run.py +++ b/dvc/commands/experiments/run.py @@ -1,12 +1,9 @@ import argparse import logging -from dvc.cli import completion from dvc.cli.utils import append_doc_link from dvc.commands.repro import CmdRepro from dvc.commands.repro import add_arguments as add_repro_arguments -from dvc.exceptions import InvalidArgumentError -from dvc.ui import ui logger = logging.getLogger(__name__) @@ -15,25 +12,12 @@ class CmdExperimentsRun(CmdRepro): def run(self): from dvc.compare import show_metrics - if self.args.checkpoint_resume: - if self.args.reset: - raise InvalidArgumentError("--reset and --rev are mutually exclusive.") - if not (self.args.queue or self.args.tmp_dir): - raise InvalidArgumentError( - "--rev can only be used in conjunction with --queue or --temp." - ) - - if self.args.reset: - ui.write("Any existing checkpoints will be reset and re-run.") - results = self.repo.experiments.run( name=self.args.name, queue=self.args.queue, run_all=self.args.run_all, jobs=self.args.jobs, params=self.args.set_param, - checkpoint_resume=self.args.checkpoint_resume, - reset=self.args.reset, tmp_dir=self.args.tmp_dir, machine=self.args.machine, copy_paths=self.args.copy_paths, @@ -59,22 +43,6 @@ def add_parser(experiments_subparsers, parent_parser): formatter_class=argparse.RawDescriptionHelpFormatter, ) _add_run_common(experiments_run_parser) - experiments_run_parser.add_argument( - "-r", - "--rev", - type=str, - dest="checkpoint_resume", - help=( - "Continue the specified checkpoint experiment. Can only be used " - "in conjunction with --queue or --temp." - ), - metavar="", - ).complete = completion.EXPERIMENT - experiments_run_parser.add_argument( - "--reset", - action="store_true", - help="Reset existing checkpoints and restart the experiment.", - ) experiments_run_parser.set_defaults(func=CmdExperimentsRun) diff --git a/dvc/commands/experiments/show.py b/dvc/commands/experiments/show.py index 0636e5e710d..cfb8b49b9bf 100644 --- a/dvc/commands/experiments/show.py +++ b/dvc/commands/experiments/show.py @@ -25,9 +25,6 @@ experiment_types = { - "checkpoint_tip": "│ ╓", - "checkpoint_commit": "│ ╟", - "checkpoint_base": "├─╨", "branch_commit": "├──", "branch_base": "└──", "baseline": "", diff --git a/dvc/commands/run.py b/dvc/commands/run.py index a5e74f11f34..a08b91e9e63 100644 --- a/dvc/commands/run.py +++ b/dvc/commands/run.py @@ -22,7 +22,6 @@ def run(self): self.args.plots_no_cache, self.args.outs_persist, self.args.outs_persist_no_cache, - self.args.checkpoints, self.args.params, self.args.command, ] @@ -39,7 +38,7 @@ def run(self): { "cmd": parse_cmd(self.args.command), "fname": kwargs.pop("file"), - "no_exec": self.args.no_exec or bool(self.args.checkpoints), + "no_exec": self.args.no_exec, "run_cache": not kwargs.pop("no_run_cache"), } ) diff --git a/dvc/commands/stage.py b/dvc/commands/stage.py index ec5906a81f5..dffcce3fe96 100644 --- a/dvc/commands/stage.py +++ b/dvc/commands/stage.py @@ -195,17 +195,6 @@ def _add_common_args(parser): help="Declare output file or directory (do not put into DVC cache).", metavar="", ).complete = completion.FILE - parser.add_argument( - "-c", - "--checkpoints", - action="append", - default=[], - help=( - "Declare checkpoint output file or directory for 'dvc exp run'. " - "Not compatible with 'dvc repro'." - ), - metavar="", - ).complete = completion.FILE parser.add_argument( "--external", action="store_true", diff --git a/dvc/env.py b/dvc/env.py index 081ec9d79e0..fc2ad3f15e6 100644 --- a/dvc/env.py +++ b/dvc/env.py @@ -1,5 +1,3 @@ -DVCLIVE_RESUME = "DVCLIVE_RESUME" -DVC_CHECKPOINT = "DVC_CHECKPOINT" DVC_DAEMON = "DVC_DAEMON" DVC_EXP_AUTO_PUSH = "DVC_EXP_AUTO_PUSH" DVC_EXP_BASELINE_REV = "DVC_EXP_BASELINE_REV" diff --git a/dvc/output.py b/dvc/output.py index 2bf387f82fc..10f818fa916 100644 --- a/dvc/output.py +++ b/dvc/output.py @@ -91,7 +91,6 @@ def loadd_from(stage, d_list): metric = d.pop(Output.PARAM_METRIC, False) plot = d.pop(Output.PARAM_PLOT, False) persist = d.pop(Output.PARAM_PERSIST, False) - checkpoint = d.pop(Output.PARAM_CHECKPOINT, False) remote = d.pop(Output.PARAM_REMOTE, None) annot = {field: d.pop(field, None) for field in ANNOTATION_FIELDS} files = d.pop(Output.PARAM_FILES, None) @@ -105,7 +104,6 @@ def loadd_from(stage, d_list): metric=metric, plot=plot, persist=persist, - checkpoint=checkpoint, remote=remote, **annot, files=files, @@ -122,7 +120,6 @@ def loads_from( metric=False, plot=False, persist=False, - checkpoint=False, remote=None, push=True, ): @@ -135,7 +132,6 @@ def loads_from( metric=metric, plot=plot, persist=persist, - checkpoint=checkpoint, remote=remote, push=push, ) @@ -191,7 +187,6 @@ def load_from_pipeline(stage, data, typ="outs"): [ Output.PARAM_CACHE, Output.PARAM_PERSIST, - Output.PARAM_CHECKPOINT, Output.PARAM_REMOTE, Output.PARAM_PUSH, *ANNOTATION_FIELDS, @@ -286,7 +281,6 @@ class Output: PARAM_PATH = "path" PARAM_CACHE = "cache" - PARAM_CHECKPOINT = "checkpoint" PARAM_FILES = "files" PARAM_METRIC = "metric" PARAM_METRIC_TYPE = "type" @@ -327,7 +321,6 @@ def __init__( # noqa: PLR0913 metric=False, plot=False, persist=False, - checkpoint=False, desc=None, type=None, # noqa: A002, pylint: disable=redefined-builtin labels=None, @@ -394,7 +387,6 @@ def __init__( # noqa: PLR0913 self.metric = False if self.IS_DEPENDENCY else metric self.plot = False if self.IS_DEPENDENCY else plot self.persist = persist - self.checkpoint = checkpoint self.can_push = push self.fs_path = self._parse_path(self.fs, fs_path) @@ -822,9 +814,6 @@ def dumpd(self, **kwargs): # noqa: C901, PLR0912 if self.persist: ret[self.PARAM_PERSIST] = self.persist - if self.checkpoint: - ret[self.PARAM_CHECKPOINT] = self.checkpoint - if self.remote: ret[self.PARAM_REMOTE] = self.remote @@ -894,7 +883,6 @@ def checkout( relink: bool = False, filter_info: Optional[str] = None, allow_missing: bool = False, - checkpoint_reset: bool = False, **kwargs, ) -> Optional[Tuple[bool, Optional[bool]]]: # callback passed act as a aggregate callback. @@ -914,11 +902,6 @@ def relative_update(self, inc: int = 1) -> None: # backward compatibility return None - if self.checkpoint and checkpoint_reset: - if self.exists: - self.remove() - return None - added = not self.exists try: @@ -935,7 +918,7 @@ def relative_update(self, inc: int = 1) -> None: **kwargs, ) except CheckoutError: - if allow_missing or self.checkpoint: + if allow_missing: return None raise self.set_exec() @@ -1436,7 +1419,6 @@ def _merge_dir_version_meta(self, other: "Output"): Required(Output.PARAM_PATH): str, Output.PARAM_PLOT: bool, Output.PARAM_PERSIST: bool, - Output.PARAM_CHECKPOINT: bool, Output.PARAM_CLOUD: CLOUD_SCHEMA, } diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 959ce6fc503..6ea4078e91b 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -5,7 +5,6 @@ from funcy import chain, first -from dvc.exceptions import DvcException from dvc.ui import ui from dvc.utils import relpath from dvc.utils.objects import cached_property @@ -22,7 +21,6 @@ CELERY_FAILED_STASH, CELERY_STASH, EXEC_APPLY, - EXEC_CHECKPOINT, EXEC_NAMESPACE, EXPS_NAMESPACE, WORKSPACE_STASH, @@ -48,10 +46,7 @@ class Experiments: repo (dvc.repo.Repo): repo instance that these experiments belong to. """ - BRANCH_RE = re.compile( - r"^(?P[a-f0-9]{7})-(?P[a-f0-9]+)" - r"(?P-checkpoint)?$" - ) + BRANCH_RE = re.compile(r"^(?P[a-f0-9]{7})-(?P[a-f0-9]+)") def __init__(self, repo): from dvc.scm import NoSCMError @@ -138,47 +133,18 @@ def reproduce_one( def queue_one( self, queue: "BaseStashQueue", - checkpoint_resume: Optional[str] = None, - reset: bool = False, **kwargs, ) -> "QueueEntry": """Queue a single experiment.""" - if reset: - self.reset_checkpoints() - if kwargs.pop("machine", None) is not None: # TODO: decide how to handle queued remote execution raise NotImplementedError - if checkpoint_resume: - from dvc.scm import resolve_rev - - resume_rev = resolve_rev(self.scm, checkpoint_resume) - try: - self.check_baseline(resume_rev) - checkpoint_resume = resume_rev - except BaselineMismatchError as exc: - raise DvcException( - f"Cannot resume from '{checkpoint_resume}' as it is not " - "derived from your current workspace." - ) from exc - else: - checkpoint_resume = self._workspace_resume_rev() - return self.new( queue, - checkpoint_resume=checkpoint_resume, - reset=reset, **kwargs, ) - def _workspace_resume_rev(self) -> Optional[str]: - last_checkpoint = self._get_last_checkpoint() - last_applied = self._get_last_applied() - if last_checkpoint and last_applied: - return last_applied - return None - def reproduce_celery( # noqa: C901 self, entries: Optional[Iterable["QueueEntry"]] = None, **kwargs ) -> Dict[str, str]: @@ -257,17 +223,12 @@ def new( self, queue: "BaseStashQueue", *args, - checkpoint_resume: Optional[str] = None, **kwargs, ) -> "QueueEntry": """Create and enqueue a new experiment. Experiment will be derived from the current workspace. """ - if checkpoint_resume is not None: - return self._resume_checkpoint( - queue, *args, resume_rev=checkpoint_resume, **kwargs - ) name = kwargs.get("name", None) baseline_sha = kwargs.get("baseline_rev") or self.repo.scm.get_rev() @@ -281,59 +242,6 @@ def new( return queue.put(*args, **kwargs) - def _resume_checkpoint( - self, - queue: "BaseStashQueue", - *args, - resume_rev: Optional[str] = None, - **kwargs, - ) -> "QueueEntry": - """Create and queue a resumed checkpoint experiment.""" - assert resume_rev - - branch: Optional[str] = None - try: - allow_multiple = bool(kwargs.get("params", None)) - branch = self.get_branch_by_rev(resume_rev, allow_multiple=allow_multiple) - if not branch: - raise DvcException( - f"Could not find checkpoint experiment '{resume_rev[:7]}'" - ) - baseline_rev = self._get_baseline(branch) - except MultipleBranchError as exc: - baselines = { - info.baseline_sha for info in exc.ref_infos if info.baseline_sha - } - if len(baselines) == 1: - baseline_rev = baselines.pop() - else: - raise - - logger.debug( - "Checkpoint run from '%s' with baseline '%s'", - resume_rev[:7], - baseline_rev, - ) - return queue.put( - *args, - resume_rev=resume_rev, - baseline_rev=baseline_rev, - branch=branch, - **kwargs, - ) - - def _get_last_checkpoint(self) -> Optional[str]: - try: - last_checkpoint = self.scm.get_ref(EXEC_CHECKPOINT) - if last_checkpoint: - self.check_baseline(last_checkpoint) - return last_checkpoint - except BaselineMismatchError: - # If HEAD has moved since the the last checkpoint run, - # the specified checkpoint is no longer relevant - self.scm.remove_ref(EXEC_CHECKPOINT) - return None - def _get_last_applied(self) -> Optional[str]: try: last_applied = self.scm.get_ref(EXEC_APPLY) @@ -346,10 +254,6 @@ def _get_last_applied(self) -> Optional[str]: self.scm.remove_ref(EXEC_APPLY) return None - def reset_checkpoints(self): - self.scm.remove_ref(EXEC_CHECKPOINT) - self.scm.remove_ref(EXEC_APPLY) - @unlocked_repo def _reproduce_queue( self, diff --git a/dvc/repo/experiments/exceptions.py b/dvc/repo/experiments/exceptions.py index d83d4a00054..09568ef7318 100644 --- a/dvc/repo/experiments/exceptions.py +++ b/dvc/repo/experiments/exceptions.py @@ -31,21 +31,6 @@ def __init__(self, name: str, command: str = "run"): self.name = name -class CheckpointExistsError(DvcException): - def __init__(self, name: str): - msg = ( - "Reproduced checkpoint experiment conflicts with existing " - f"experiment '{name}'. To restart (and overwrite) the existing " - "experiment run:\n\n" - "\tdvc exp run -f ...\n\n" - "To resume the existing experiment, run:\n\n" - f"\tdvc exp apply {name}\n" - "\tdvc exp run\n" - ) - super().__init__(msg) - self.name = name - - class InvalidExpRefError(DvcException): def __init__(self, ref): super().__init__(f"'{ref}' is not a valid experiment refname.") diff --git a/dvc/repo/experiments/executor/base.py b/dvc/repo/experiments/executor/base.py index 5cbe248b2a4..b454561c23e 100644 --- a/dvc/repo/experiments/executor/base.py +++ b/dvc/repo/experiments/executor/base.py @@ -6,7 +6,6 @@ from contextlib import contextmanager from dataclasses import asdict, dataclass from enum import IntEnum -from functools import partial from itertools import chain from typing import ( TYPE_CHECKING, @@ -29,18 +28,12 @@ from dvc.env import DVC_EXP_AUTO_PUSH, DVC_EXP_GIT_REMOTE from dvc.exceptions import DvcException -from dvc.repo.experiments.exceptions import CheckpointExistsError, ExperimentExistsError -from dvc.repo.experiments.refs import ( - EXEC_BASELINE, - EXEC_BRANCH, - EXEC_CHECKPOINT, - ExpRefInfo, -) +from dvc.repo.experiments.exceptions import ExperimentExistsError +from dvc.repo.experiments.refs import EXEC_BASELINE, EXEC_BRANCH, ExpRefInfo from dvc.repo.experiments.utils import to_studio_params from dvc.repo.metrics.show import _collect_top_level_metrics from dvc.repo.params.show import _collect_top_level_params from dvc.stage.serialize import to_lockfile -from dvc.ui import ui from dvc.utils import dict_sha256, env2bool, relpath from dvc.utils.fs import remove from dvc.utils.studio import env_to_config @@ -381,7 +374,7 @@ def fetch_exps( dest_scm: "Git", refs: List[str], force: bool = False, - on_diverged: Optional[Callable[[str, bool], None]] = None, + on_diverged: Optional[Callable[[str], None]] = None, **kwargs, ) -> Iterable[str]: """Fetch reproduced experiment refs into the specified SCM. @@ -390,41 +383,32 @@ def fetch_exps( dest_scm: Destination Git instance. refs: reference names to be fetched from the remotes. force: If True, diverged refs will be overwritten - on_diverged: Callback in the form on_diverged(ref, is_checkpoint) + on_diverged: Callback in the form on_diverged(ref) to be called when an experiment ref has diverged. Extra kwargs will be passed into the remote git client. """ - if EXEC_CHECKPOINT in refs: - refs.remove(EXEC_CHECKPOINT) - has_checkpoint = True - else: - has_checkpoint = False - def on_diverged_ref(orig_ref: str, new_rev: str): if force: logger.debug("Replacing existing experiment '%s'", orig_ref) return True if on_diverged: - return on_diverged(orig_ref, has_checkpoint) + return on_diverged(orig_ref) - self._raise_ref_conflict(dest_scm, orig_ref, new_rev, has_checkpoint) + self._raise_ref_conflict(dest_scm, orig_ref, new_rev) logger.debug("Reproduced existing experiment '%s'", orig_ref) return False # fetch experiments try: refspecs = [f"{ref}:{ref}" for ref in refs] - # update last run checkpoint (if it exists) - if has_checkpoint: - refspecs.append(f"{EXEC_CHECKPOINT}:{EXEC_CHECKPOINT}") dest_scm.fetch_refspecs( self.git_url, refspecs, on_diverged=on_diverged_ref, - force=force or has_checkpoint, + force=force, **kwargs, ) except SCMError: @@ -521,19 +505,6 @@ def filter_pipeline(stages): repro_dry = kwargs.get("dry") - # NOTE: checkpoint outs are handled as a special type of persist - # out: - # - # - checkpoint out may not yet exist if this is the first time this - # experiment has been run, this is not an error condition for - # experiments - # - if experiment was run with --reset, the checkpoint out will be - # removed at the start of the experiment (regardless of any - # dvc.lock entry for the checkpoint out) - # - if run without --reset, the checkpoint out will be checked out - # using any hash present in dvc.lock (or removed if no entry - # exists in dvc.lock) - checkpoint_reset: bool = kwargs.pop("reset", False) if not repro_dry: dvc_checkout( dvc, @@ -542,23 +513,13 @@ def filter_pipeline(stages): force=True, quiet=True, allow_missing=True, - checkpoint_reset=checkpoint_reset, recursive=kwargs.get("recursive", False), ) - checkpoint_func = partial( - cls.checkpoint_callback, - dvc, - dvc.scm, - info.name, - repro_force or checkpoint_reset, - ) - stages = dvc_reproduce( dvc, *args, on_unchanged=filter_pipeline, - checkpoint_func=checkpoint_func, **kwargs, ) if paths := cls._get_top_level_paths(dvc): @@ -570,7 +531,6 @@ def filter_pipeline(stages): ref, exp_ref, repro_force = cls._repro_commit( dvc, info, - stages, exp_hash, auto_push, git_remote, @@ -591,20 +551,17 @@ def _repro_commit( cls, dvc, info, - stages, exp_hash, auto_push, git_remote, repro_force, message: Optional[str] = None, ) -> Tuple[Optional[str], Optional["ExpRefInfo"], bool]: - is_checkpoint = any(stage.is_checkpoint for stage in stages) cls.commit( dvc.scm, exp_hash, exp_name=info.name, force=repro_force, - checkpoint=is_checkpoint, message=message, ) if auto_push: @@ -639,7 +596,6 @@ def _repro_dvc( # noqa: C901 from dvc_studio_client.post_live_metrics import post_live_metrics from dvc.repo import Repo - from dvc.stage.monitor import CheckpointKilledError with Repo(os.path.join(info.root_dir, info.dvc_dir)) as dvc: info.status = TaskStatus.RUNNING @@ -678,9 +634,6 @@ def _repro_dvc( # noqa: C901 logger.debug("Running repro in '%s'", os.getcwd()) yield dvc info.status = TaskStatus.SUCCESS - except CheckpointKilledError: - info.status = TaskStatus.FAILED - raise except DvcException: if log_errors: logger.exception("") @@ -745,24 +698,6 @@ def _auto_push( exc, ) - @classmethod - def checkpoint_callback( - cls, - dvc: "Repo", - scm: "Git", - name: Optional[str], - force: bool, - unchanged: Iterable["PipelineStage"], - stages: Iterable["PipelineStage"], - ): - exp_hash = cls.hash_exp(list(stages) + list(unchanged)) - exp_rev = cls.commit(scm, exp_hash, exp_name=name, force=force, checkpoint=True) - - if env2bool(DVC_EXP_AUTO_PUSH): - git_remote = os.getenv(DVC_EXP_GIT_REMOTE) - cls._auto_push(dvc, scm, git_remote) - ui.write(f"Checkpoint experiment iteration '{exp_rev[:7]}'.") - @classmethod def commit( cls, @@ -770,7 +705,6 @@ def commit( exp_hash: str, exp_name: Optional[str] = None, force: bool = False, - checkpoint: bool = False, message: Optional[str] = None, ): """Commit stages as an experiment and return the commit SHA.""" @@ -805,23 +739,20 @@ def commit( scm.commit(message, no_verify=True) new_rev = scm.get_rev() if check_conflict: - new_rev = cls._raise_ref_conflict(scm, branch, new_rev, checkpoint) + new_rev = cls._raise_ref_conflict(scm, branch, new_rev) else: scm.set_ref(branch, new_rev, old_ref=old_ref) scm.set_ref(EXEC_BRANCH, branch, symbolic=True) - if checkpoint: - scm.set_ref(EXEC_CHECKPOINT, new_rev) + return new_rev @staticmethod - def _raise_ref_conflict(scm, ref, new_rev, checkpoint): + def _raise_ref_conflict(scm, ref, new_rev): # If this commit is a duplicate of the existing commit at 'ref', return # the existing commit. Otherwise, error out and require user to re-run # with --force as needed orig_rev = scm.get_ref(ref) if scm.diff(orig_rev, new_rev): - if checkpoint: - raise CheckpointExistsError(ref) raise ExperimentExistsError(ref) return orig_rev diff --git a/dvc/repo/experiments/executor/local.py b/dvc/repo/experiments/executor/local.py index f97ceaa4e1a..03a4ac75389 100644 --- a/dvc/repo/experiments/executor/local.py +++ b/dvc/repo/experiments/executor/local.py @@ -10,10 +10,8 @@ from dvc.lock import LockError from dvc.repo.experiments.refs import ( - EXEC_APPLY, EXEC_BASELINE, EXEC_BRANCH, - EXEC_CHECKPOINT, EXEC_HEAD, EXEC_MERGE, EXEC_NAMESPACE, @@ -124,9 +122,6 @@ def init_git( if self.scm.get_ref(EXEC_BRANCH): self.scm.remove_ref(EXEC_BRANCH) - if self.scm.get_ref(EXEC_CHECKPOINT): - self.scm.remove_ref(EXEC_CHECKPOINT) - # checkout EXEC_HEAD and apply EXEC_MERGE on top of it without # committing assert isinstance(self.scm, Git) @@ -189,7 +184,6 @@ class WorkspaceExecutor(BaseLocalExecutor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._detach_stack = ExitStack() - self._orig_checkpoint = self.scm.get_ref(EXEC_CHECKPOINT) @classmethod def from_stash_entry( @@ -251,6 +245,3 @@ def cleanup(self, infofile: Optional[str] = None): self.scm.remove_ref(EXEC_MERGE) if self.scm.get_ref(EXEC_BRANCH): self.scm.remove_ref(EXEC_BRANCH) - checkpoint = self.scm.get_ref(EXEC_CHECKPOINT) - if checkpoint and checkpoint != self._orig_checkpoint: - self.scm.set_ref(EXEC_APPLY, checkpoint) diff --git a/dvc/repo/experiments/executor/ssh.py b/dvc/repo/experiments/executor/ssh.py index 1cef8bf22d9..85431dbeaf9 100644 --- a/dvc/repo/experiments/executor/ssh.py +++ b/dvc/repo/experiments/executor/ssh.py @@ -11,7 +11,6 @@ from dvc.repo.experiments.refs import ( EXEC_BASELINE, EXEC_BRANCH, - EXEC_CHECKPOINT, EXEC_HEAD, EXEC_MERGE, EXEC_NAMESPACE, @@ -162,7 +161,6 @@ def init_git( self._ssh_cmd(fs, f"git symbolic-ref {EXEC_BRANCH} {branch}") else: self._ssh_cmd(fs, f"git symbolic-ref -d {EXEC_BRANCH}", check=False) - self._ssh_cmd(fs, f"git update-ref -d {EXEC_CHECKPOINT}", check=False) # checkout EXEC_HEAD and apply EXEC_MERGE on top of it without # committing diff --git a/dvc/repo/experiments/queue/base.py b/dvc/repo/experiments/queue/base.py index 0df531d69be..adf83efb5fc 100644 --- a/dvc/repo/experiments/queue/base.py +++ b/dvc/repo/experiments/queue/base.py @@ -21,10 +21,9 @@ from funcy import retry from dvc.dependency import ParamsDependency -from dvc.env import DVC_EXP_BASELINE_REV, DVC_EXP_NAME, DVCLIVE_RESUME -from dvc.exceptions import DvcException +from dvc.env import DVC_EXP_BASELINE_REV, DVC_EXP_NAME from dvc.lock import LockError -from dvc.repo.experiments.exceptions import CheckpointExistsError, ExperimentExistsError +from dvc.repo.experiments.exceptions import ExperimentExistsError from dvc.repo.experiments.executor.base import BaseExecutor from dvc.repo.experiments.executor.local import WorkspaceExecutor from dvc.repo.experiments.refs import ExpRefInfo @@ -32,11 +31,9 @@ from dvc.repo.experiments.utils import ( EXEC_PID_DIR, EXEC_TMP_DIR, - exp_refs_by_rev, get_exp_rwlock, get_random_exp_name, ) -from dvc.ui import ui from dvc.utils.objects import cached_property from dvc.utils.studio import config_to_env @@ -300,11 +297,10 @@ def logs( output. """ - def _stash_exp( # noqa: PLR0915, C901 + def _stash_exp( # noqa: C901 self, *args, params: Optional[Dict[str, List[str]]] = None, - resume_rev: Optional[str] = None, baseline_rev: Optional[str] = None, branch: Optional[str] = None, name: Optional[str] = None, @@ -315,7 +311,6 @@ def _stash_exp( # noqa: PLR0915, C901 Args: params: Dict mapping paths to `Hydra Override`_ patterns, provided via `exp run --set-param`. - resume_rev: Optional checkpoint resume rev. baseline_rev: Optional baseline rev for this experiment, defaults to the current SCM rev. branch: Optional experiment branch name. If specified, the @@ -338,17 +333,6 @@ def _stash_exp( # noqa: PLR0915, C901 if workspace: self.stash.apply(workspace) - if resume_rev: - # move HEAD to the resume rev so that the stashed diff - # only contains changes relative to resume rev - stash_head = resume_rev - self.scm.set_ref( - "HEAD", - resume_rev, - message=f"dvc: resume from HEAD {resume_rev[:7]}", - ) - self.scm.reset() - # update experiment params from command line if params: self._update_params(params) @@ -357,60 +341,6 @@ def _stash_exp( # noqa: PLR0915, C901 # & tempdir runs self._stash_commit_deps(*args, **kwargs) - if resume_rev: - if branch: - branch_name = ExpRefInfo.from_ref(branch).name - else: - branch_name = f"{resume_rev[:7]}" - if self.scm.is_dirty(untracked_files=False): - ui.write( - ( - "Modified checkpoint experiment based on " - f"'{branch_name}' will be created" - ), - ) - branch = None - elif not branch or self.scm.get_ref(branch) != resume_rev: - err_msg = [ - "Nothing to do for unchanged checkpoint " - f"'{resume_rev[:7]}'. " - ] - if branch: - err_msg.append( - "To resume from the head of this " - "experiment, use " - f"'dvc exp apply {branch_name}'." - ) - else: - names = [ - ref_info.name - for ref_info in exp_refs_by_rev( - self.scm, resume_rev - ) - ] - if len(names) > 3: - names[3:] = [f"... ({len(names) - 3} more)"] - err_msg.append( - "To resume an experiment containing this " - "checkpoint, apply one of these heads:\n" - "\t{}".format(", ".join(names)) - ) - raise DvcException("".join(err_msg)) - else: - ui.write( - "Existing checkpoint experiment " - f"'{branch_name}' will be resumed" - ) - if name: - logger.warning( - ( - "Ignoring option '--name %s' for resumed " - "experiment. Existing experiment name will" - "be preserved instead." - ), - name, - ) - # save additional repro command line arguments run_env = { DVC_EXP_BASELINE_REV: baseline_rev, @@ -418,8 +348,6 @@ def _stash_exp( # noqa: PLR0915, C901 if not name: name = get_random_exp_name(self.scm, baseline_rev) run_env[DVC_EXP_NAME] = name - if resume_rev: - run_env[DVCLIVE_RESUME] = "1" # save studio config to read later by dvc and dvclive studio_config = get_studio_config( @@ -445,12 +373,6 @@ def _stash_exp( # noqa: PLR0915, C901 baseline_rev[:7], ) finally: - if resume_rev: - # NOTE: this set_ref + reset() is equivalent to - # `git reset orig_head` (our SCM reset() only operates - # on HEAD rather than any arbitrary commit) - self.scm.set_ref("HEAD", orig_head, message="dvc: restore HEAD") - self.scm.reset() # Revert any of our changes before prior unstashing self.scm.reset(hard=True) @@ -633,10 +555,8 @@ def collect_git( ) -> Dict[str, str]: results = {} - def on_diverged(ref: str, checkpoint: bool): + def on_diverged(ref: str): ref_info = ExpRefInfo.from_ref(ref) - if checkpoint: - raise CheckpointExistsError(ref_info.name) raise ExperimentExistsError(ref_info.name) refs = get_remote_executor_refs(exp.scm, executor.git_url) diff --git a/dvc/repo/experiments/queue/tempdir.py b/dvc/repo/experiments/queue/tempdir.py index 64e3197a145..f98596fa7c8 100644 --- a/dvc/repo/experiments/queue/tempdir.py +++ b/dvc/repo/experiments/queue/tempdir.py @@ -100,8 +100,6 @@ def _reproduce_entry( message: Optional[str] = None, **kwargs, ) -> Dict[str, Dict[str, str]]: - from dvc.stage.monitor import CheckpointKilledError - results: Dict[str, Dict[str, str]] = defaultdict(dict) exec_name = self._EXEC_NAME or entry.stash_rev infofile = self.get_infofile_path(exec_name) @@ -122,12 +120,6 @@ def _reproduce_entry( results[rev].update( self.collect_executor(self.repo.experiments, executor, exec_result) ) - except CheckpointKilledError: - results[rev].update( - self.collect_executor(self.repo.experiments, executor, exec_result) - ) - - return results except DvcException: raise except Exception as exc: # noqa: BLE001 diff --git a/dvc/repo/experiments/queue/utils.py b/dvc/repo/experiments/queue/utils.py index e1c9b8b3dd9..45afdfc44c8 100644 --- a/dvc/repo/experiments/queue/utils.py +++ b/dvc/repo/experiments/queue/utils.py @@ -4,12 +4,7 @@ from scmrepo.exceptions import SCMError from dvc.repo.experiments.executor.base import ExecutorInfo, TaskStatus -from dvc.repo.experiments.refs import ( - EXEC_CHECKPOINT, - EXEC_NAMESPACE, - EXPS_NAMESPACE, - EXPS_STASH, -) +from dvc.repo.experiments.refs import EXEC_NAMESPACE, EXPS_NAMESPACE, EXPS_STASH from dvc.repo.experiments.utils import get_exp_rwlock, iter_remote_refs logger = logging.getLogger(__name__) @@ -33,9 +28,7 @@ def get_remote_executor_refs(scm: "Git", remote_url: str) -> List[str]: remote_url, base=EXPS_NAMESPACE, ): - if ref == EXEC_CHECKPOINT or ( - not ref.startswith(EXEC_NAMESPACE) and ref != EXPS_STASH - ): + if not ref.startswith(EXEC_NAMESPACE) and ref != EXPS_STASH: refs.append(ref) return refs @@ -67,7 +60,7 @@ def fetch_running_exp_from_temp_dir( result[rev] = info.asdict() if info.git_url and fetch_refs and info.status > TaskStatus.PREPARING: - def on_diverged(_ref: str, _checkpoint: bool): + def on_diverged(_ref: str): return True executor = TempDirExecutor.from_info(info) diff --git a/dvc/repo/experiments/queue/workspace.py b/dvc/repo/experiments/queue/workspace.py index 1e4d29fe566..4f4980474df 100644 --- a/dvc/repo/experiments/queue/workspace.py +++ b/dvc/repo/experiments/queue/workspace.py @@ -101,7 +101,6 @@ def _reproduce_entry( self, entry: QueueEntry, executor: "BaseExecutor", **kwargs ) -> Dict[str, Dict[str, str]]: kwargs.pop("copy_paths", None) - from dvc.stage.monitor import CheckpointKilledError from dvc_task.proc.process import ProcessInfo results: Dict[str, Dict[str, str]] = defaultdict(dict) @@ -127,9 +126,6 @@ def _reproduce_entry( results[rev].update( self.collect_executor(self.repo.experiments, executor, exec_result) ) - except CheckpointKilledError: - # Checkpoint errors have already been logged - return {} except DvcException: raise except Exception as exc: # noqa: BLE001 diff --git a/dvc/repo/experiments/refs.py b/dvc/repo/experiments/refs.py index 7f083633031..5497a25c779 100644 --- a/dvc/repo/experiments/refs.py +++ b/dvc/repo/experiments/refs.py @@ -15,7 +15,6 @@ CELERY_FAILED_STASH = f"{CELERY_NAMESPACE}/failed" EXEC_NAMESPACE = f"{EXPS_NAMESPACE}/exec" EXEC_APPLY = f"{EXEC_NAMESPACE}/EXEC_APPLY" -EXEC_CHECKPOINT = f"{EXEC_NAMESPACE}/EXEC_CHECKPOINT" EXEC_BRANCH = f"{EXEC_NAMESPACE}/EXEC_BRANCH" EXEC_BASELINE = f"{EXEC_NAMESPACE}/EXEC_BASELINE" EXEC_HEAD = f"{EXEC_NAMESPACE}/EXEC_HEAD" diff --git a/dvc/repo/experiments/run.py b/dvc/repo/experiments/run.py index 284e3002be5..d3c71494793 100644 --- a/dvc/repo/experiments/run.py +++ b/dvc/repo/experiments/run.py @@ -87,9 +87,6 @@ def run( # noqa: C901, PLR0912 else: sweeps = [path_overrides] - if not kwargs.get("checkpoint_resume", None): - kwargs["reset"] = True - for idx, sweep_overrides in enumerate(sweeps): if hydra_sweep and name_prefix is not None: kwargs["name"] = f"{name_prefix}-{idx+1}" diff --git a/dvc/repo/experiments/serialize.py b/dvc/repo/experiments/serialize.py index f9335676a55..886f5a77be5 100644 --- a/dvc/repo/experiments/serialize.py +++ b/dvc/repo/experiments/serialize.py @@ -69,7 +69,6 @@ def from_repo( onerror=onerror_collect, ) ) - meta = cls._gather_meta(repo) return cls( rev=rev, params=params, @@ -94,16 +93,9 @@ def from_repo( for out in repo.index.outs if not (out.is_metric or out.is_plot) }, - meta=meta, **kwargs, ) - @staticmethod - def _gather_meta(repo: "Repo") -> Dict[str, Any]: - return { - "has_checkpoints": any(stage.is_checkpoint for stage in repo.index.stages) - } - def dumpd(self) -> Dict[str, Any]: return asdict(self) diff --git a/dvc/repo/experiments/stash.py b/dvc/repo/experiments/stash.py index 332701bdcf7..1ac1cf86bb4 100644 --- a/dvc/repo/experiments/stash.py +++ b/dvc/repo/experiments/stash.py @@ -21,7 +21,7 @@ class ExpStashEntry(NamedTuple): is not pushed onto the stash ref. head_rev: HEAD Git commit to be checked out for this experiment. baseline_rev: Experiment baseline commit. - branch: Optional exp (checkpoint) branch name for this experiment. + branch: Optional branch name for this experiment. name: Optional exp name. """ diff --git a/dvc/repo/experiments/utils.py b/dvc/repo/experiments/utils.py index f97b2d68ca6..aefc405aab9 100644 --- a/dvc/repo/experiments/utils.py +++ b/dvc/repo/experiments/utils.py @@ -26,7 +26,6 @@ EXEC_APPLY, EXEC_BASELINE, EXEC_BRANCH, - EXEC_CHECKPOINT, EXPS_NAMESPACE, ITER_SKIP_NAMESPACES, STASHES, @@ -221,7 +220,6 @@ def exp_commits( def remove_exp_refs(scm: "Git", ref_infos: Iterable[ExpRefInfo]): exec_branch = scm.get_ref(EXEC_BRANCH, follow=False) exec_apply = scm.get_ref(EXEC_APPLY) - exec_checkpoint = scm.get_ref(EXEC_CHECKPOINT) for ref_info in ref_infos: ref = scm.get_ref(str(ref_info)) @@ -229,8 +227,6 @@ def remove_exp_refs(scm: "Git", ref_infos: Iterable[ExpRefInfo]): scm.remove_ref(EXEC_BRANCH) if exec_apply and exec_apply == ref: scm.remove_ref(EXEC_APPLY) - if exec_checkpoint and exec_checkpoint == ref: - scm.remove_ref(EXEC_CHECKPOINT) scm.remove_ref(str(ref_info)) diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index fdf4999b05f..a96f8445ea1 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -1,10 +1,8 @@ import logging -from functools import partial from typing import TYPE_CHECKING, Iterator, List -from dvc.exceptions import DvcException, ReproductionError +from dvc.exceptions import ReproductionError from dvc.repo.scm_context import scm_context -from dvc.stage.exceptions import CheckpointKilledError from . import locked @@ -17,21 +15,6 @@ def _reproduce_stage(stage: "Stage", **kwargs) -> List["Stage"]: - def _run_callback(repro_callback): - stage.dump(update_pipeline=False) - _track_stage(stage) - repro_callback([stage]) - - checkpoint_func = kwargs.pop("checkpoint_func", None) - if stage.is_checkpoint: - if checkpoint_func: - kwargs["checkpoint_func"] = partial(_run_callback, checkpoint_func) - else: - raise DvcException( - "Checkpoint stages are not supported in 'dvc repro'. " - "Checkpoint stages must be reproduced with 'dvc exp run'. " - ) - if stage.frozen and not stage.is_import: logger.warning( "%s is frozen. Its dependencies are not going to be reproduced.", @@ -43,10 +26,8 @@ def _run_callback(repro_callback): return [] if not kwargs.get("dry", False): - track = checkpoint_func is not None stage.dump(update_pipeline=False) - if track: - _track_stage(stage) + _track_stage(stage) return [stage] @@ -182,17 +163,11 @@ def _reproduce_stages( # noqa: C901 unchanged: List["Stage"] = [] # `ret` is used to add a cosmetic newline. ret: List["Stage"] = [] - checkpoint_func = kwargs.pop("checkpoint_func", None) - for i, stage in enumerate(steps): + for stage in steps: if ret: logger.info("") - if checkpoint_func: - kwargs["checkpoint_func"] = partial( - _repro_callback, checkpoint_func, unchanged - ) - try: ret = _reproduce_stage(stage, **kwargs) @@ -208,21 +183,6 @@ def _reproduce_stages( # noqa: C901 if ret: result.extend(ret) - except CheckpointKilledError: - result.append(stage) - logger.warning( - ( - "Checkpoint stage '%s' was interrupted remaining stages in" - " pipeline will not be reproduced." - ), - stage.addressing, - ) - logger.warning( - "skipped stages '%s'", - ", ".join(s.addressing for s in steps[i + 1 :]), - ) - - break except Exception as exc: # noqa: BLE001 raise ReproductionError(stage.addressing) from exc diff --git a/dvc/schema.py b/dvc/schema.py index 5ea24a26cd2..214f65b881f 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -53,7 +53,6 @@ **ANNOTATION_SCHEMA, # type: ignore[arg-type] Output.PARAM_CACHE: bool, Output.PARAM_PERSIST: bool, - Output.PARAM_CHECKPOINT: bool, Output.PARAM_REMOTE: str, Output.PARAM_PUSH: bool, } diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 8ee32dda571..1fe3d1a24ca 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -277,14 +277,6 @@ def is_repo_import(self) -> bool: def is_versioned_import(self) -> bool: return self.is_import and self.deps[0].fs.version_aware - @property - def is_checkpoint(self) -> bool: - """ - A stage containing checkpoint outs is always considered as changed - since the checkpoint out is a circular dependency. - """ - return any(out.checkpoint for out in self.outs) - def short_description(self) -> Optional["str"]: desc: Optional["str"] = None if self.desc: @@ -294,36 +286,20 @@ def short_description(self) -> Optional["str"]: return line.strip() return desc - def _read_env(self, out, checkpoint_func=None) -> Env: - env: Env = {} - if out.checkpoint and checkpoint_func: - from dvc.env import DVC_CHECKPOINT - - env.update({DVC_CHECKPOINT: "1"}) - return env - - def env(self, checkpoint_func=None) -> Env: + def env(self) -> Env: from dvc.env import DVC_ROOT env: Env = {} if self.repo: env.update({DVC_ROOT: self.repo.root_dir}) - for out in self.outs: - current = self._read_env(out, checkpoint_func=checkpoint_func) - if any( - env.get(key) != current.get(key) - for key in set(env.keys()).intersection(current.keys()) - ): - raise DvcException("Conflicting values for env variable") - env.update(current) return env def changed_deps(self, allow_missing: bool = False) -> bool: if self.frozen: return False - if self.is_callback or self.always_changed or self.is_checkpoint: + if self.is_callback or self.always_changed: return True return self._changed_deps(allow_missing=allow_missing) @@ -384,7 +360,7 @@ def changed(self, allow_missing: bool = False) -> bool: def remove_outs(self, ignore_remove=False, force=False) -> None: """Used mainly for `dvc remove --outs` and :func:`Stage.reproduce`.""" for out in self.outs: - if (out.persist or out.checkpoint) and not force: + if out.persist and not force: out.unprotect() continue @@ -525,7 +501,7 @@ def save_outs(self, allow_missing: bool = False): try: out.save() except OutputDoesNotExistError: - if not (allow_missing or out.checkpoint): + if not allow_missing: raise if old_out := old_versioned_outs.get(out.def_path): @@ -560,7 +536,7 @@ def commit(self, allow_missing=False, filter_info=None, **kwargs) -> None: try: out.commit(filter_info=filter_info, **kwargs) except OutputDoesNotExistError: - if not (allow_missing or out.checkpoint): + if not allow_missing: raise except CacheLinkError: link_failures.append(out.fs_path) @@ -579,7 +555,7 @@ def add_outs( # noqa: C901 try: out.add(filter_info, **kwargs) except (FileNotFoundError, OutputDoesNotExistError): - if not (allow_missing or out.checkpoint): + if not allow_missing: raise except CacheLinkError: link_failures.append(filter_info or out.fs_path) @@ -619,7 +595,7 @@ def run( # noqa: C901 self._check_missing_outputs() if not dry: - if kwargs.get("checkpoint_func", None) or no_download: + if no_download: allow_missing = True no_cache_outs = any( @@ -666,7 +642,7 @@ def checkout( for out in self.filter_outs(kwargs.get("filter_info")): key, outs = self._checkout( out, - allow_missing=allow_missing or self.is_checkpoint, + allow_missing=allow_missing, **kwargs, ) if key: @@ -721,7 +697,7 @@ def _status_outs(self, ret, filter_info) -> None: ret.append({"changed outs": outs_status}) def _status_always_changed(self, ret) -> None: - if self.is_callback or self.always_changed or self.is_checkpoint: + if self.is_callback or self.always_changed: ret.append("always changed") def _status_stage(self, ret) -> None: diff --git a/dvc/stage/exceptions.py b/dvc/stage/exceptions.py index 5f9ac50d6d3..d1c867c3189 100644 --- a/dvc/stage/exceptions.py +++ b/dvc/stage/exceptions.py @@ -9,14 +9,6 @@ def __init__(self, cmd, status=None): super().__init__(msg) -class CheckpointKilledError(DvcException): - def __init__(self, cmd, status=None): - msg = f"failed to finish: {cmd}" - if status is not None: - msg += f", exited with {status}" - super().__init__(msg) - - class StageFileDoesNotExistError(DvcException): DVC_IGNORED = "is dvc-ignored" DOES_NOT_EXIST = "does not exist" diff --git a/dvc/stage/monitor.py b/dvc/stage/monitor.py deleted file mode 100644 index d7aa02e119b..00000000000 --- a/dvc/stage/monitor.py +++ /dev/null @@ -1,123 +0,0 @@ -import functools -import logging -import os -import subprocess # nosec B404 -import threading -from dataclasses import dataclass -from typing import TYPE_CHECKING, Callable, List - -from dvc.stage.decorators import relock_repo -from dvc.stage.exceptions import CheckpointKilledError - -if TYPE_CHECKING: - from dvc.stage import Stage - - -logger = logging.getLogger(__name__) - - -@dataclass -class MonitorTask: - stage: "Stage" - execute: Callable - proc: subprocess.Popen - done: threading.Event = threading.Event() - updated: threading.Event = threading.Event() - - @property - def name(self) -> str: - raise NotImplementedError - - @property - def SIGNAL_FILE(self) -> str: # noqa: N802 - raise NotImplementedError - - @property - def error_cls(self) -> type: - raise NotImplementedError - - @property - def signal_path(self) -> str: - return os.path.join(self.stage.repo.tmp_dir, self.SIGNAL_FILE) - - def after_run(self): - pass - - -class CheckpointTask(MonitorTask): - name = "checkpoint" - SIGNAL_FILE = "DVC_CHECKPOINT" - error_cls = CheckpointKilledError - - def __init__(self, stage: "Stage", callback_func: Callable, proc: subprocess.Popen): - super().__init__( - stage, - functools.partial(CheckpointTask._run_callback, stage, callback_func), - proc, - ) - - @staticmethod - @relock_repo - def _run_callback(stage, callback_func): - stage.save(allow_missing=True) - stage.commit(allow_missing=True) - stage.unprotect_outs() - logger.debug("Running checkpoint callback for stage '%s'", stage) - callback_func() - - -class Monitor: - AWAIT: float = 1.0 - - def __init__(self, tasks: List[CheckpointTask]): - self.done = threading.Event() - self.tasks = tasks - self.monitor_thread = threading.Thread( - target=Monitor._loop, args=(self.tasks, self.done) - ) - - def __enter__(self): - self.monitor_thread.start() - - def __exit__(self, exc_type, exc_val, exc_tb): - self.done.set() - self.monitor_thread.join() - for t in self.tasks: - t.after_run() - - @staticmethod - def kill(proc): - if os.name == "nt": - return Monitor._kill_nt(proc) - proc.terminate() - proc.wait() - - @staticmethod - def _kill_nt(proc): - # windows stages are spawned with shell=True, proc is the shell process - # and not the actual stage process - we have to kill the entire tree - subprocess.call( # nosec B607, B603 - ["taskkill", "/F", "/T", "/PID", str(proc.pid)] - ) - - @staticmethod - def _loop(tasks: List[MonitorTask], done: threading.Event): - while True: - for task in tasks: - if os.path.exists(task.signal_path): - try: - task.execute() - task.updated.set() - except Exception: # pylint: disable=broad-except - logger.exception( - "Error running '%s' task, '%s' will be aborted", - task.name, - task.stage, - ) - Monitor.kill(task.proc) - finally: - logger.debug("Removing signal file for '%s' task", task.name) - os.remove(task.signal_path) - - if done.wait(Monitor.AWAIT): - return diff --git a/dvc/stage/run.py b/dvc/stage/run.py index de51a555619..aca0fbc2715 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -3,17 +3,12 @@ import signal import subprocess # nosec B404 import threading -from typing import TYPE_CHECKING, List -from dvc.stage.monitor import CheckpointTask, Monitor from dvc.utils import fix_env from .decorators import unlocked_repo from .exceptions import StageCmdFailedError -if TYPE_CHECKING: - from dvc.stage import Stage - logger = logging.getLogger(__name__) @@ -43,10 +38,9 @@ def _enforce_cmd_list(cmd): return cmd if isinstance(cmd, list) else cmd.splitlines() -def prepare_kwargs(stage, checkpoint_func=None, run_env=None): +def prepare_kwargs(stage, run_env=None): kwargs = {"cwd": stage.wdir, "env": fix_env(None), "close_fds": True} - kwargs["env"].update(stage.env(checkpoint_func=checkpoint_func)) if run_env: kwargs["env"].update(run_env) @@ -75,7 +69,7 @@ def get_executable(): return (os.getenv("SHELL") or "/bin/sh") if os.name != "nt" else None -def _run(stage: "Stage", executable, cmd, checkpoint_func, **kwargs): +def _run(executable, cmd, **kwargs): # pylint: disable=protected-access main_thread = isinstance( threading.current_thread(), @@ -90,36 +84,19 @@ def _run(stage: "Stage", executable, cmd, checkpoint_func, **kwargs): if main_thread: old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) - tasks = _get_monitor_tasks(stage, checkpoint_func, p) - - if tasks: - with Monitor(tasks): - p.communicate() - else: - p.communicate() + p.communicate() if p.returncode != 0: - for t in tasks: - if t.updated.is_set(): - raise t.error_cls(cmd, p.returncode) raise StageCmdFailedError(cmd, p.returncode) finally: if old_handler: signal.signal(signal.SIGINT, old_handler) -def _get_monitor_tasks(stage, checkpoint_func, proc) -> List[CheckpointTask]: - result = [] - if checkpoint_func: - result.append(CheckpointTask(stage, checkpoint_func, proc)) - - return result - - -def cmd_run(stage, dry=False, checkpoint_func=None, run_env=None): +def cmd_run(stage, dry=False, run_env=None): logger.info("Running stage '%s':", stage.addressing) commands = _enforce_cmd_list(stage.cmd) - kwargs = prepare_kwargs(stage, checkpoint_func=checkpoint_func, run_env=run_env) + kwargs = prepare_kwargs(stage, run_env=run_env) executable = get_executable() if not dry: @@ -130,7 +107,7 @@ def cmd_run(stage, dry=False, checkpoint_func=None, run_env=None): if dry: continue - _run(stage, executable, cmd, checkpoint_func=checkpoint_func, **kwargs) + _run(executable, cmd, **kwargs) def _pull_missing_deps(stage): @@ -143,12 +120,11 @@ def run_stage( stage, dry=False, force=False, - checkpoint_func=None, run_env=None, allow_missing: bool = False, **kwargs, ): - if not (force or checkpoint_func): + if not force: if allow_missing and kwargs.get("pull") and not dry: _pull_missing_deps(stage) @@ -163,4 +139,4 @@ def run_stage( stage.save_deps() run = cmd_run if dry else unlocked_repo(cmd_run) - run(stage, dry=dry, checkpoint_func=checkpoint_func, run_env=run_env) + run(stage, dry=dry, run_env=run_env) diff --git a/dvc/stage/serialize.py b/dvc/stage/serialize.py index 8c6a7de2e99..0e163493518 100644 --- a/dvc/stage/serialize.py +++ b/dvc/stage/serialize.py @@ -34,7 +34,6 @@ PARAM_METRIC = Output.PARAM_METRIC PARAM_PLOT = Output.PARAM_PLOT PARAM_PERSIST = Output.PARAM_PERSIST -PARAM_CHECKPOINT = Output.PARAM_CHECKPOINT PARAM_DESC = Annotation.PARAM_DESC PARAM_REMOTE = Output.PARAM_REMOTE PARAM_PUSH = Output.PARAM_PUSH @@ -49,8 +48,6 @@ def _get_flags(out): if not out.use_cache: yield PARAM_CACHE, False - if out.checkpoint: - yield PARAM_CHECKPOINT, True if out.persist: yield PARAM_PERSIST, True if out.plot and isinstance(out.plot, dict): diff --git a/dvc/stage/utils.py b/dvc/stage/utils.py index 6d2e2ade462..568937f4883 100644 --- a/dvc/stage/utils.py +++ b/dvc/stage/utils.py @@ -62,7 +62,6 @@ def fill_stage_outputs(stage, **kwargs): "plots_persist_no_cache", "outs_no_cache", "outs", - "checkpoints", ] stage.outs = [] @@ -75,7 +74,6 @@ def fill_stage_outputs(stage, **kwargs): persist="persist" in key, metric="metrics" in key, plot="plots" in key, - checkpoint="checkpoints" in key, ) @@ -169,7 +167,6 @@ def compute_md5(stage): stage.PARAM_FROZEN, Output.PARAM_METRIC, Output.PARAM_PERSIST, - Output.PARAM_CHECKPOINT, Meta.PARAM_ISEXEC, Meta.PARAM_SIZE, Meta.PARAM_NFILES, @@ -240,7 +237,6 @@ def prepare_file_path(kwargs) -> str: kwargs.get("plots_no_cache", []), kwargs.get("outs_persist", []), kwargs.get("outs_persist_no_cache", []), - kwargs.get("checkpoints", []), ) ) diff --git a/tests/func/experiments/conftest.py b/tests/func/experiments/conftest.py index 7358dc09895..df4cffb4de5 100644 --- a/tests/func/experiments/conftest.py +++ b/tests/func/experiments/conftest.py @@ -1,9 +1,7 @@ import pytest from tests.unit.repo.experiments.conftest import ( # noqa, pylint disable=unused-argument - checkpoint_stage, exp_stage, - failed_checkpoint_stage, failed_exp_stage, session_app, session_queue, diff --git a/tests/func/experiments/test_checkpoints.py b/tests/func/experiments/test_checkpoints.py deleted file mode 100644 index e548efd87e1..00000000000 --- a/tests/func/experiments/test_checkpoints.py +++ /dev/null @@ -1,269 +0,0 @@ -import logging - -import pytest -from funcy import first - -from dvc.config import NoRemoteError -from dvc.env import DVC_EXP_AUTO_PUSH, DVC_EXP_GIT_REMOTE -from dvc.exceptions import DvcException -from dvc.repo.experiments.exceptions import MultipleBranchError -from dvc.repo.experiments.executor.base import BaseExecutor -from dvc.repo.experiments.refs import EXEC_APPLY, EXEC_CHECKPOINT -from dvc.repo.experiments.utils import exp_refs_by_rev -from dvc.scm import InvalidRemoteSCMRepo, RevError - - -@pytest.mark.parametrize("links", ["reflink,copy", "hardlink,symlink"]) -def test_new_checkpoint(tmp_dir, scm, dvc, checkpoint_stage, mocker, workspace, links): - with dvc.config.edit() as conf: - conf["cache"]["type"] = links - - new_mock = mocker.spy(dvc.experiments, "new") - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - exp = first(results) - - new_mock.assert_called_once() - with dvc.switch(exp): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 2" - - if workspace: - assert scm.get_ref(EXEC_APPLY) == exp - assert scm.get_ref(EXEC_CHECKPOINT) == exp - if workspace: - assert (tmp_dir / "foo").read_text().strip() == str(checkpoint_stage.iterations) - assert (tmp_dir / "metrics.yaml").read_text().strip() == "foo: 2" - - -@pytest.mark.parametrize("checkpoint_resume", [None, "foo"]) -def test_resume_checkpoint( - tmp_dir, scm, dvc, checkpoint_stage, checkpoint_resume, workspace -): - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - - with pytest.raises(RevError, match="unknown Git revision 'abc1234'"): - dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume="abc1234", - tmp_dir=not workspace, - ) - - if checkpoint_resume: - checkpoint_resume = first(results) - - if not workspace: - dvc.experiments.apply(first(results)) - results = dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume=checkpoint_resume, - tmp_dir=not workspace, - ) - exp = first(results) - - with dvc.switch(exp): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 2" - - if workspace: - assert scm.get_ref(EXEC_APPLY) == exp - assert scm.get_ref(EXEC_CHECKPOINT) == exp - - -def test_reset_checkpoint(tmp_dir, scm, dvc, checkpoint_stage, caplog, workspace): - dvc.experiments.run(checkpoint_stage.addressing, tmp_dir=not workspace) - - results = dvc.experiments.run( - checkpoint_stage.addressing, - params=["foo=2"], - tmp_dir=not workspace, - reset=True, - ) - exp = first(results) - - with dvc.switch(exp): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 2" - - if workspace: - assert scm.get_ref(EXEC_APPLY) == exp - assert scm.get_ref(EXEC_CHECKPOINT) == exp - - -def test_resume_branch(tmp_dir, scm, dvc, checkpoint_stage, workspace): - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - branch_rev = first(results) - if not workspace: - dvc.experiments.apply(branch_rev) - - results = dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume=branch_rev, - tmp_dir=not workspace, - ) - checkpoint_a = first(results) - - dvc.experiments.apply(branch_rev) - results = dvc.experiments.run( - checkpoint_stage.addressing, - checkpoint_resume=branch_rev, - params=["foo=100"], - tmp_dir=not workspace, - ) - checkpoint_b = first(results) - - with dvc.switch(checkpoint_a): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 2" - - with dvc.switch(checkpoint_b): - fs = dvc.dvcfs - with fs.open("foo") as fobj: - assert fobj.read().strip() == str(2 * checkpoint_stage.iterations) - with fs.open("metrics.yaml") as fobj: - assert fobj.read().strip() == "foo: 100" - - with pytest.raises(MultipleBranchError): - dvc.experiments.get_branch_by_rev(branch_rev) - - assert branch_rev == dvc.experiments.scm.gitpython.repo.git.merge_base( - checkpoint_a, checkpoint_b - ) - - -def test_resume_non_head_checkpoint(tmp_dir, scm, dvc, checkpoint_stage, workspace): - orig_head = scm.get_rev() - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - checkpoint_head = first(results) - orig_branch = dvc.experiments.get_branch_by_rev(checkpoint_head) - - rev = list(scm.branch_revs(checkpoint_head, orig_head))[-1] - dvc.experiments.apply(rev) - - with pytest.raises(DvcException, match="Nothing to do for unchanged checkpoint"): - dvc.experiments.run(checkpoint_stage.addressing, tmp_dir=not workspace) - - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=100"], tmp_dir=not workspace - ) - new_head = first(results) - assert orig_branch != dvc.experiments.get_branch_by_rev(new_head) - - -@pytest.fixture -def clear_env(monkeypatch): - yield - monkeypatch.delenv(DVC_EXP_GIT_REMOTE, raising=False) - monkeypatch.delenv(DVC_EXP_AUTO_PUSH, raising=False) - - -@pytest.mark.parametrize("use_url", [True, False]) -def test_auto_push_during_iterations( - tmp_dir, - scm, - dvc, - checkpoint_stage, - git_upstream, - local_remote, - use_url, - monkeypatch, - mocker, - clear_env, -): - # set up remote repo - remote = git_upstream.url if use_url else git_upstream.remote - git_upstream.tmp_dir.scm.fetch_refspecs(str(tmp_dir), ["master:master"]) - monkeypatch.setenv(DVC_EXP_GIT_REMOTE, remote) - auto_push_spy = mocker.spy(BaseExecutor, "_auto_push") - - # without auto push - results = dvc.experiments.run(checkpoint_stage.addressing) - assert auto_push_spy.call_count == 0 - - # add auto push - monkeypatch.setenv(DVC_EXP_AUTO_PUSH, "true") - results = dvc.experiments.run(checkpoint_stage.addressing) - assert (tmp_dir / "foo").read_text() == "4" - exp = first(results) - ref_info = first(exp_refs_by_rev(scm, exp)) - assert git_upstream.tmp_dir.scm.get_ref(str(ref_info)) == exp - - # Assert 3 pushes: 2 checkpoints and final commit - assert auto_push_spy.call_count == 3 - assert auto_push_spy.call_args[0][2] == remote - - -def test_auto_push_error_url( - dvc, scm, checkpoint_stage, local_remote, monkeypatch, clear_env -): - monkeypatch.setenv(DVC_EXP_GIT_REMOTE, "true") - monkeypatch.setenv(DVC_EXP_AUTO_PUSH, "true") - with pytest.raises(InvalidRemoteSCMRepo): - dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - - -def test_auto_push_no_remote( - dvc, scm, checkpoint_stage, git_upstream, monkeypatch, clear_env -): - monkeypatch.setenv(DVC_EXP_GIT_REMOTE, git_upstream.url) - monkeypatch.setenv(DVC_EXP_AUTO_PUSH, "true") - with pytest.raises(NoRemoteError): - dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - - -def test_auto_push_self_remote( - tmp_dir, - dvc, - scm, - checkpoint_stage, - local_remote, - caplog, - monkeypatch, - clear_env, -): - root_dir = str(tmp_dir) - monkeypatch.setenv(DVC_EXP_GIT_REMOTE, root_dir) - monkeypatch.setenv(DVC_EXP_AUTO_PUSH, "true") - assert dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) != {} - - with caplog.at_level(logging.WARNING, logger="dvc.repo.experiments"): - assert ( - f"'{root_dir}' points to the current Git repo, experiment " - "Git refs will not be pushed. But DVC cache and run cache will " - "automatically be pushed to the default DVC remote (if any) " - "on each experiment commit." in caplog.text - ) - - -def test_tmp_dir_failed_checkpoint(tmp_dir, scm, dvc, failed_checkpoint_stage, caplog): - dvc.experiments.run( - failed_checkpoint_stage.addressing, params=["foo=2"], tmp_dir=True - ) - - result = dvc.experiments.show()[1] - assert len(result.experiments) == 1 - # Assert 2 checkpoints and final commit - assert len(result.experiments[0]) == 3 - assert ( - "Checkpoint stage 'failed-checkpoint-file' was interrupted remaining " - "stages in pipeline will not be reproduced." in caplog.text - ) diff --git a/tests/func/experiments/test_remote.py b/tests/func/experiments/test_remote.py index 2f09721f35f..ba39f2335e2 100644 --- a/tests/func/experiments/test_remote.py +++ b/tests/func/experiments/test_remote.py @@ -1,5 +1,3 @@ -import os - import pytest from funcy import first @@ -110,24 +108,6 @@ def test_push_diverged(tmp_dir, scm, dvc, git_upstream, exp_stage): assert git_upstream.tmp_dir.scm.get_ref(str(ref_info)) == exp -def test_push_checkpoint(tmp_dir, scm, dvc, git_upstream, checkpoint_stage): - results = dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - exp_a = first(results) - ref_info_a = first(exp_refs_by_rev(scm, exp_a)) - - dvc.experiments.push(git_upstream.remote, [ref_info_a.name], force=True) - assert git_upstream.tmp_dir.scm.get_ref(str(ref_info_a)) == exp_a - - results = dvc.experiments.run(checkpoint_stage.addressing, checkpoint_resume=exp_a) - exp_b = first(results) - ref_info_b = first(exp_refs_by_rev(scm, exp_b)) - - tmp_dir.scm_gen("new", "new", commit="new") - - dvc.experiments.push(git_upstream.remote, [ref_info_b.name], force=True) - assert git_upstream.tmp_dir.scm.get_ref(str(ref_info_b)) == exp_b - - def test_push_ambiguous_name(tmp_dir, scm, dvc, git_upstream, exp_stage): from dvc.exceptions import InvalidArgumentError @@ -293,23 +273,6 @@ def test_pull_diverged(tmp_dir, scm, dvc, git_downstream, exp_stage): assert git_downstream.tmp_dir.scm.get_ref(str(ref_info)) == exp -def test_pull_checkpoint(tmp_dir, scm, dvc, git_downstream, checkpoint_stage): - results = dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - exp_a = first(results) - ref_info_a = first(exp_refs_by_rev(scm, exp_a)) - - downstream_exp = git_downstream.tmp_dir.dvc.experiments - downstream_exp.pull(git_downstream.remote, [ref_info_a.name], force=True) - assert git_downstream.tmp_dir.scm.get_ref(str(ref_info_a)) == exp_a - - results = dvc.experiments.run(checkpoint_stage.addressing, checkpoint_resume=exp_a) - exp_b = first(results) - ref_info_b = first(exp_refs_by_rev(scm, exp_b)) - - downstream_exp.pull(git_downstream.remote, [ref_info_b.name], force=True) - assert git_downstream.tmp_dir.scm.get_ref(str(ref_info_b)) == exp_b - - def test_pull_ambiguous_name(tmp_dir, scm, dvc, git_downstream, exp_stage): from dvc.exceptions import InvalidArgumentError @@ -335,36 +298,6 @@ def test_pull_ambiguous_name(tmp_dir, scm, dvc, git_downstream, exp_stage): assert git_downstream.tmp_dir.scm.get_ref(str(ref_info_a)) == exp_a -def test_push_pull_cache( - tmp_dir, scm, dvc, git_upstream, checkpoint_stage, local_remote -): - from dvc.utils.fs import remove - from tests.func.test_diff import digest - - remote = git_upstream.remote - results = dvc.experiments.run(checkpoint_stage.addressing, params=["foo=2"]) - exp = first(results) - ref_info = first(exp_refs_by_rev(scm, exp)) - - dvc.experiments.push(remote, [ref_info.name], push_cache=True) - for x in range(2, checkpoint_stage.iterations + 1): - hash_ = digest(str(x)) - path = os.path.join(local_remote.url, hash_[:2], hash_[2:]) - assert os.path.exists(path) - with open(path, encoding="utf-8") as f: - assert f.read() == str(x) - - remove(dvc.cache.local.path) - - dvc.experiments.pull(remote, [ref_info.name], pull_cache=True) - for x in range(2, checkpoint_stage.iterations + 1): - hash_ = digest(str(x)) - path = os.path.join(dvc.cache.local.path, hash_[:2], hash_[2:]) - assert os.path.exists(path) - with open(path, encoding="utf-8") as f: - assert f.read() == str(x) - - def test_auth_error_list(tmp_dir, scm, dvc, http_auth_patch): from dvc.scm import GitAuthError diff --git a/tests/func/experiments/test_set_params.py b/tests/func/experiments/test_set_params.py index b0dde61c4b4..7d1b750a44e 100644 --- a/tests/func/experiments/test_set_params.py +++ b/tests/func/experiments/test_set_params.py @@ -118,7 +118,6 @@ def test_hydra_sweep( patched.assert_any_call( mocker.ANY, params=e, - reset=True, targets=None, copy_paths=None, message=None, diff --git a/tests/func/experiments/test_show.py b/tests/func/experiments/test_show.py index a36422baee8..04ca5d3e92b 100644 --- a/tests/func/experiments/test_show.py +++ b/tests/func/experiments/test_show.py @@ -243,40 +243,6 @@ def test_show_failed_experiment(tmp_dir, scm, dvc, failed_exp_stage, test_queue) } -@pytest.mark.vscode -@pytest.mark.parametrize("workspace", [True, False]) -def test_show_checkpoint(tmp_dir, scm, dvc, checkpoint_stage, capsys, workspace): - results = dvc.experiments.run( - checkpoint_stage.addressing, params=["foo=2"], tmp_dir=not workspace - ) - exp_rev = first(results) - - baseline = dvc.experiments.show()[1] - assert baseline.data.meta.get("has_checkpoints") - checkpoints = first(baseline.experiments) - # 2 checkpoints + final commit - assert len(checkpoints) == checkpoint_stage.iterations + 1 - assert first(checkpoints).rev == exp_rev - - capsys.readouterr() - assert main(["exp", "show", "--no-pager"]) == 0 - cap = capsys.readouterr() - - for i, exp in enumerate(checkpoints): - rev = exp.rev - if i == 0: - name = dvc.experiments.get_exact_name([rev])[rev] - name = f"{rev[:7]} [{name}]" - fs = "╓" - elif i == len(checkpoints) - 1: - name = rev[:7] - fs = "╨" - else: - name = rev[:7] - fs = "╟" - assert f"{fs} {name}" in cap.out - - def test_show_filter( tmp_dir, scm, diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index df93a549610..716f4c444b7 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -1222,4 +1222,4 @@ def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy, mocker): assert dvc.status([stage.addressing]) == {stage.addressing: ["changed checksum"]} assert dvc.reproduce(stage.addressing)[0] == stage - m.assert_called_once_with(stage, checkpoint_func=None, dry=False, run_env=None) + m.assert_called_once_with(stage, dry=False, run_env=None) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index 7c85634194f..f4709839c81 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -136,7 +136,7 @@ def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy, mocker): assert dvc.status([target]) == {target: ["changed command"]} assert dvc.reproduce(target)[0] == stage - m.assert_called_once_with(stage, checkpoint_func=None, dry=False, run_env=None) + m.assert_called_once_with(stage, dry=False, run_env=None) def test_repro_when_new_deps_is_added_in_dvcfile(tmp_dir, dvc, run_copy, copy_script): diff --git a/tests/func/test_stage.py b/tests/func/test_stage.py index 242155d3218..1fff601967a 100644 --- a/tests/func/test_stage.py +++ b/tests/func/test_stage.py @@ -7,9 +7,8 @@ from dvc.exceptions import OutputDuplicationError from dvc.fs import LocalFileSystem from dvc.output import Output -from dvc.repo import Repo, lock_repo +from dvc.repo import Repo from dvc.stage import PipelineStage, Stage -from dvc.stage.run import run_stage from dvc.stage.utils import compute_md5 from dvc.utils import dict_md5 from dvc.utils.serialize import dump_yaml, load_yaml @@ -318,24 +317,6 @@ def test_stage_remove_pointer_stage(tmp_dir, dvc, run_copy): assert not (tmp_dir / stage.relpath).exists() -@pytest.mark.parametrize("checkpoint", [True, False]) -def test_stage_run_checkpoint(tmp_dir, dvc, mocker, checkpoint): - stage = Stage(dvc, "stage.dvc", cmd="mycmd arg1 arg2") - mocker.patch.object(stage, "save") - - mock_cmd_run = mocker.patch("dvc.stage.run.cmd_run") - if checkpoint: - callback = mocker.Mock() - else: - callback = None - - with lock_repo(dvc): - run_stage(stage, checkpoint_func=callback) - mock_cmd_run.assert_called_with( - stage, checkpoint_func=callback, dry=False, run_env=None - ) - - def test_stage_add_duplicated_output(tmp_dir, dvc): tmp_dir.dvc_gen("foo", "foo") dvc.add("foo") diff --git a/tests/unit/command/test_experiments.py b/tests/unit/command/test_experiments.py index 98065a74313..9e1bb1ea6e8 100644 --- a/tests/unit/command/test_experiments.py +++ b/tests/unit/command/test_experiments.py @@ -126,8 +126,6 @@ def test_experiments_run(dvc, scm, mocker): "run_all": False, "jobs": 1, "tmp_dir": False, - "checkpoint_resume": None, - "reset": False, "machine": None, "copy_paths": [], "message": None, diff --git a/tests/unit/command/test_run.py b/tests/unit/command/test_run.py index d328dedd14d..b9fea4bd260 100644 --- a/tests/unit/command/test_run.py +++ b/tests/unit/command/test_run.py @@ -35,8 +35,6 @@ def test_run(mocker, dvc): "outs-persist", "--outs-persist-no-cache", "outs-persist-no-cache", - "--checkpoints", - "checkpoints", "--always-changed", "--params", "file:param1,param2", @@ -66,7 +64,6 @@ def test_run(mocker, dvc): plots_no_cache=["plots-no-cache"], outs_persist=["outs-persist"], outs_persist_no_cache=["outs-persist-no-cache"], - checkpoints=["checkpoints"], params=["file:param1,param2", "param3"], fname="file", wdir="wdir", @@ -99,7 +96,6 @@ def test_run_args_from_cli(mocker, dvc): plots_no_cache=[], outs_persist=[], outs_persist_no_cache=[], - checkpoints=[], params=[], fname=None, wdir=None, @@ -132,7 +128,6 @@ def test_run_args_with_spaces(mocker, dvc): plots_no_cache=[], outs_persist=[], outs_persist_no_cache=[], - checkpoints=[], params=[], fname=None, wdir=None, diff --git a/tests/unit/command/test_stage.py b/tests/unit/command/test_stage.py index 2b090a31bd1..c902187a8bd 100644 --- a/tests/unit/command/test_stage.py +++ b/tests/unit/command/test_stage.py @@ -42,8 +42,6 @@ def test_stage_add(mocker, dvc, command, parsed_command): "outs-persist", "--outs-persist-no-cache", "outs-persist-no-cache", - "--checkpoints", - "checkpoints", "--always-changed", "--params", "file:param1,param2", @@ -79,7 +77,6 @@ def test_stage_add(mocker, dvc, command, parsed_command): wdir="wdir", outs_persist=["outs-persist"], outs_persist_no_cache=["outs-persist-no-cache"], - checkpoints=["checkpoints"], always_changed=True, external=True, desc="description", diff --git a/tests/unit/repo/experiments/conftest.py b/tests/unit/repo/experiments/conftest.py index af0a5439ad1..65b84f366ef 100644 --- a/tests/unit/repo/experiments/conftest.py +++ b/tests/unit/repo/experiments/conftest.py @@ -1,53 +1,10 @@ from functools import partial -from textwrap import dedent import pytest from dvc_task.app import FSApp DEFAULT_ITERATIONS = 2 -CHECKPOINT_SCRIPT_FORMAT = dedent( - """\ - import os - import sys - import shutil - - from dvc.api import make_checkpoint - - checkpoint_file = {} - checkpoint_iterations = int({}) - if os.path.exists(checkpoint_file): - with open(checkpoint_file) as fobj: - try: - value = int(fobj.read()) - except ValueError: - value = 0 - else: - with open(checkpoint_file, "w"): - pass - value = 0 - - shutil.copyfile({}, {}) - - if os.getenv("DVC_CHECKPOINT"): - for index in range(checkpoint_iterations): - value += 1 - {} - with open(checkpoint_file, "w") as fobj: - fobj.write(str(value)) - make_checkpoint() -""" -) -CHECKPOINT_SCRIPT = CHECKPOINT_SCRIPT_FORMAT.format( - "sys.argv[1]", "sys.argv[2]", "sys.argv[3]", "sys.argv[4]", "" -) -FAILED_CHECKPOINT_SCRIPT = CHECKPOINT_SCRIPT_FORMAT.format( - "sys.argv[1]", - "sys.argv[2]", - "sys.argv[3]", - "sys.argv[4]", - "if index == (checkpoint_iterations - 2): raise Exception", -) @pytest.fixture @@ -74,48 +31,6 @@ def exp_stage(tmp_dir, scm, dvc, copy_script): return stage -@pytest.fixture -def checkpoint_stage(tmp_dir, scm, dvc, mocker): - mocker.patch("dvc.stage.run.Monitor.AWAIT", 0.01) - - tmp_dir.gen("checkpoint.py", CHECKPOINT_SCRIPT) - tmp_dir.gen("params.yaml", "foo: 1") - stage = dvc.run( - cmd=f"python checkpoint.py foo {DEFAULT_ITERATIONS} params.yaml metrics.yaml", - metrics_no_cache=["metrics.yaml"], - params=["foo"], - checkpoints=["foo"], - deps=["checkpoint.py"], - no_exec=True, - name="checkpoint-file", - ) - scm.add(["dvc.yaml", "checkpoint.py", "params.yaml", ".gitignore"]) - scm.commit("init") - stage.iterations = DEFAULT_ITERATIONS - return stage - - -@pytest.fixture -def failed_checkpoint_stage(tmp_dir, scm, dvc, mocker): - mocker.patch("dvc.stage.run.Monitor.AWAIT", 0.01) - - tmp_dir.gen("checkpoint.py", FAILED_CHECKPOINT_SCRIPT) - tmp_dir.gen("params.yaml", "foo: 1") - stage = dvc.run( - cmd=f"python checkpoint.py foo {DEFAULT_ITERATIONS+2} params.yaml metrics.yaml", - metrics_no_cache=["metrics.yaml"], - params=["foo"], - checkpoints=["foo"], - deps=["checkpoint.py"], - no_exec=True, - name="failed-checkpoint-file", - ) - scm.add(["dvc.yaml", "checkpoint.py", "params.yaml", ".gitignore"]) - scm.commit("init") - stage.iterations = DEFAULT_ITERATIONS - return stage - - @pytest.fixture def failed_exp_stage(tmp_dir, scm, dvc, copy_script): tmp_dir.gen("params.yaml", "foo: 1") diff --git a/tests/unit/repo/experiments/test_show.py b/tests/unit/repo/experiments/test_show.py new file mode 100644 index 00000000000..9e7660ae2ac --- /dev/null +++ b/tests/unit/repo/experiments/test_show.py @@ -0,0 +1,29 @@ +from funcy import first + +from dvc.repo.experiments.show import get_branch_names, update_names + + +def test_get_show_branch(tmp_dir, scm, dvc, exp_stage): + new_branch = "new" + + baseline = scm.get_rev() + base_branch = scm.active_branch() + results = dvc.experiments.run(exp_stage.addressing, params=["foo=2"]) + exp_a = first(results) + dvc.experiments.branch(exp_a, new_branch) + + scm.checkout(new_branch, force=True) + + result = { + "workspace": {"baseline": {"data": {}}}, + exp_a: {"baseline": {"data": {}}}, + baseline: {"baseline": {"data": {}}, exp_a: {"data": {}}}, + } + + branch_names = get_branch_names(scm, ["workspace", exp_a, baseline]) + assert branch_names == {exp_a: new_branch, baseline: base_branch} + + update_names(dvc, branch_names, result) + assert result[exp_a]["baseline"]["data"] == {"name": new_branch} + assert result[baseline]["baseline"]["data"] == {"name": base_branch} + assert result[baseline][exp_a]["data"] == {"name": new_branch} diff --git a/tests/unit/stage/test_stage.py b/tests/unit/stage/test_stage.py index bec4a5a546f..2227cefdaf1 100644 --- a/tests/unit/stage/test_stage.py +++ b/tests/unit/stage/test_stage.py @@ -122,22 +122,3 @@ def test_external_outs(tmp_path_factory, dvc): create_stage(Stage, dvc, "path.dvc", outs=["remote://myremote/foo"]) create_stage(Stage, dvc, "path.dvc", outs=[os.fspath(foo)], external=True) - - -def test_env(dvc, mocker): - from dvc.env import DVC_ROOT - from dvc.exceptions import DvcException - from dvc.stage import create_stage - - stage = create_stage(Stage, dvc, "path.dvc", outs=["foo", "bar"]) - - mocker.patch.object(stage, "_read_env", return_value={"foo": "foo"}) - env = stage.env() - assert env == {DVC_ROOT: dvc.root_dir, "foo": "foo"} - - def mock_read_env(out, **kwargs): - return {"foo": str(out)} - - mocker.patch.object(stage, "_read_env", mock_read_env) - with pytest.raises(DvcException, match="Conflicting values for env variable"): - _ = stage.env()