Skip to content

Commit

Permalink
exp: Drop checkpoints.
Browse files Browse the repository at this point in the history
Closes #9221
  • Loading branch information
daavoo committed Mar 29, 2023
1 parent eef8795 commit 4e03644
Show file tree
Hide file tree
Showing 44 changed files with 52 additions and 1,573 deletions.
3 changes: 1 addition & 2 deletions dvc/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

from .data import open # pylint: disable=redefined-builtin
from .data import get_url, read
from .experiments import exp_save, make_checkpoint
from .experiments import exp_save
from .show import metrics_show, params_show

__all__ = [
"exp_save",
"get_url",
"make_checkpoint",
"open",
"params_show",
"metrics_show",
Expand Down
31 changes: 0 additions & 31 deletions dvc/api/experiments.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,6 @@
import builtins
import os
from time import sleep
from typing import List, Optional

from dvc.env import DVC_CHECKPOINT, DVC_ROOT
from dvc.repo import Repo
from dvc.stage.monitor import CheckpointTask


def make_checkpoint():
"""
Signal DVC to create a checkpoint experiment.
If the current process is being run from DVC, this function will block
until DVC has finished creating the checkpoint. Otherwise, this function
will return immediately.
"""
if os.getenv(DVC_CHECKPOINT) is None:
return

root_dir = os.getenv(DVC_ROOT, Repo.find_root())
signal_file = os.path.join(
root_dir, Repo.DVC_DIR, "tmp", CheckpointTask.SIGNAL_FILE
)

with builtins.open(signal_file, "w", encoding="utf-8") as fobj:
# NOTE: force flushing/writing empty file to disk, otherwise when
# run in certain contexts (pytest) file may not actually be written
fobj.write("")
fobj.flush()
os.fsync(fobj.fileno())
while os.path.exists(signal_file):
sleep(0.1)


def exp_save(
Expand Down
32 changes: 0 additions & 32 deletions dvc/commands/experiments/run.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import argparse
import logging

from dvc.cli import completion
from dvc.cli.utils import append_doc_link
from dvc.commands.repro import CmdRepro
from dvc.commands.repro import add_arguments as add_repro_arguments
from dvc.exceptions import InvalidArgumentError
from dvc.ui import ui

logger = logging.getLogger(__name__)

Expand All @@ -15,25 +12,12 @@ class CmdExperimentsRun(CmdRepro):
def run(self):
from dvc.compare import show_metrics

if self.args.checkpoint_resume:
if self.args.reset:
raise InvalidArgumentError("--reset and --rev are mutually exclusive.")
if not (self.args.queue or self.args.tmp_dir):
raise InvalidArgumentError(
"--rev can only be used in conjunction with --queue or --temp."
)

if self.args.reset:
ui.write("Any existing checkpoints will be reset and re-run.")

results = self.repo.experiments.run(
name=self.args.name,
queue=self.args.queue,
run_all=self.args.run_all,
jobs=self.args.jobs,
params=self.args.set_param,
checkpoint_resume=self.args.checkpoint_resume,
reset=self.args.reset,
tmp_dir=self.args.tmp_dir,
machine=self.args.machine,
**self._common_kwargs,
Expand All @@ -57,22 +41,6 @@ def add_parser(experiments_subparsers, parent_parser):
formatter_class=argparse.RawDescriptionHelpFormatter,
)
_add_run_common(experiments_run_parser)
experiments_run_parser.add_argument(
"-r",
"--rev",
type=str,
dest="checkpoint_resume",
help=(
"Continue the specified checkpoint experiment. Can only be used "
"in conjunction with --queue or --temp."
),
metavar="<experiment_rev>",
).complete = completion.EXPERIMENT
experiments_run_parser.add_argument(
"--reset",
action="store_true",
help="Reset existing checkpoints and restart the experiment.",
)
experiments_run_parser.set_defaults(func=CmdExperimentsRun)


Expand Down
28 changes: 2 additions & 26 deletions dvc/commands/experiments/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,13 @@ def _collect_names(all_experiments, **kwargs):


experiment_types = {
"checkpoint_tip": "│ ╓",
"checkpoint_commit": "│ ╟",
"checkpoint_base": "├─╨",
"branch_commit": "├──",
"branch_base": "└──",
"baseline": "",
}


def _collect_rows( # noqa: C901, PLR0912, PLR0913, PLR0915
def _collect_rows( # noqa: C901, PLR0913
base_rev,
experiments,
all_headers,
Expand All @@ -86,7 +83,6 @@ def _collect_rows( # noqa: C901, PLR0912, PLR0913, PLR0915
reverse = sort_order == "desc"
experiments = _sort_exp(experiments, sort_path, sort_name, sort_type, reverse)

new_checkpoint = True
for i, (rev, results) in enumerate(experiments.items()):
fill_value = FILL_VALUE_ERRORED if results.get("error") else fill_value
row_dict = {k: fill_value for k in all_headers}
Expand All @@ -104,30 +100,14 @@ def _collect_rows( # noqa: C901, PLR0912, PLR0913, PLR0915
else:
name_rev = rev[:7]

tip = exp.get("checkpoint_tip")
parent_rev = exp.get("checkpoint_parent", "")
parent_exp = experiments.get(parent_rev, {}).get("data", {})
parent_tip = parent_exp.get("checkpoint_tip")

parent = ""
if is_baseline:
typ = "baseline"
elif tip:
if tip == parent_tip:
typ = "checkpoint_tip" if new_checkpoint else "checkpoint_commit"
elif parent_rev == base_rev:
typ = "checkpoint_base"
else:
typ = "checkpoint_commit"
parent = parent_rev[:7]
elif i < len(experiments) - 1:
typ = "branch_commit"
else:
typ = "branch_base"

if not is_baseline:
new_checkpoint = not (tip and tip == parent_tip)

row_dict["Experiment"] = exp.get("name", "")
row_dict["rev"] = name_rev
row_dict["typ"] = typ
Expand Down Expand Up @@ -191,12 +171,8 @@ def _sort_column(sort_by, metric_names, param_names):

def _sort_exp(experiments, sort_path, sort_name, typ, reverse):
def _sort(item):
rev, exp = item
_, exp = item
exp_data = exp.get("data", {})
tip = exp_data.get("checkpoint_tip")
if tip and tip != rev:
# Sort checkpoint experiments by tip commit
return _sort((tip, experiments[tip]))
data = exp_data.get(typ, {}).get(sort_path, {}).get("data", {})
val = flatten(data).get(sort_name)
return val is None, val
Expand Down
3 changes: 1 addition & 2 deletions dvc/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def run(self):
self.args.plots_no_cache,
self.args.outs_persist,
self.args.outs_persist_no_cache,
self.args.checkpoints,
self.args.params,
self.args.command,
]
Expand All @@ -39,7 +38,7 @@ def run(self):
{
"cmd": parse_cmd(self.args.command),
"fname": kwargs.pop("file"),
"no_exec": self.args.no_exec or bool(self.args.checkpoints),
"no_exec": self.args.no_exec,
"run_cache": not kwargs.pop("no_run_cache"),
}
)
Expand Down
11 changes: 0 additions & 11 deletions dvc/commands/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,6 @@ def _add_common_args(parser):
help="Declare output file or directory (do not put into DVC cache).",
metavar="<filename>",
).complete = completion.FILE
parser.add_argument(
"-c",
"--checkpoints",
action="append",
default=[],
help=(
"Declare checkpoint output file or directory for 'dvc exp run'. "
"Not compatible with 'dvc repro'."
),
metavar="<filename>",
).complete = completion.FILE
parser.add_argument(
"--external",
action="store_true",
Expand Down
2 changes: 0 additions & 2 deletions dvc/env.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
DVC_CHECKPOINT = "DVC_CHECKPOINT"
DVC_DAEMON = "DVC_DAEMON"
DVC_PAGER = "DVC_PAGER"
DVC_ROOT = "DVC_ROOT"
DVCLIVE_RESUME = "DVCLIVE_RESUME"
DVC_IGNORE_ISATTY = "DVC_IGNORE_ISATTY"
DVC_EXP_BASELINE_REV = "DVC_EXP_BASELINE_REV"
DVC_EXP_NAME = "DVC_EXP_NAME"
Expand Down
20 changes: 1 addition & 19 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def loadd_from(stage, d_list):
metric = d.pop(Output.PARAM_METRIC, False)
plot = d.pop(Output.PARAM_PLOT, False)
persist = d.pop(Output.PARAM_PERSIST, False)
checkpoint = d.pop(Output.PARAM_CHECKPOINT, False)
remote = d.pop(Output.PARAM_REMOTE, None)
annot = {field: d.pop(field, None) for field in ANNOTATION_FIELDS}
files = d.pop(Output.PARAM_FILES, None)
Expand All @@ -105,7 +104,6 @@ def loadd_from(stage, d_list):
metric=metric,
plot=plot,
persist=persist,
checkpoint=checkpoint,
remote=remote,
**annot,
files=files,
Expand All @@ -122,7 +120,6 @@ def loads_from(
metric=False,
plot=False,
persist=False,
checkpoint=False,
remote=None,
push=True,
):
Expand All @@ -135,7 +132,6 @@ def loads_from(
metric=metric,
plot=plot,
persist=persist,
checkpoint=checkpoint,
remote=remote,
push=push,
)
Expand Down Expand Up @@ -191,7 +187,6 @@ def load_from_pipeline(stage, data, typ="outs"):
[
Output.PARAM_CACHE,
Output.PARAM_PERSIST,
Output.PARAM_CHECKPOINT,
Output.PARAM_REMOTE,
Output.PARAM_PUSH,
*ANNOTATION_FIELDS,
Expand Down Expand Up @@ -281,7 +276,6 @@ class Output:

PARAM_PATH = "path"
PARAM_CACHE = "cache"
PARAM_CHECKPOINT = "checkpoint"
PARAM_FILES = "files"
PARAM_METRIC = "metric"
PARAM_METRIC_TYPE = "type"
Expand Down Expand Up @@ -322,7 +316,6 @@ def __init__( # noqa: PLR0913
metric=False,
plot=False,
persist=False,
checkpoint=False,
desc=None,
type=None, # noqa: A002, pylint: disable=redefined-builtin
labels=None,
Expand Down Expand Up @@ -389,7 +382,6 @@ def __init__( # noqa: PLR0913
self.metric = False if self.IS_DEPENDENCY else metric
self.plot = False if self.IS_DEPENDENCY else plot
self.persist = persist
self.checkpoint = checkpoint
self.can_push = push

self.fs_path = self._parse_path(self.fs, fs_path)
Expand Down Expand Up @@ -816,9 +808,6 @@ def dumpd(self, **kwargs): # noqa: C901, PLR0912
if self.persist:
ret[self.PARAM_PERSIST] = self.persist

if self.checkpoint:
ret[self.PARAM_CHECKPOINT] = self.checkpoint

if self.remote:
ret[self.PARAM_REMOTE] = self.remote

Expand Down Expand Up @@ -888,7 +877,6 @@ def checkout(
relink: bool = False,
filter_info: Optional[str] = None,
allow_missing: bool = False,
checkpoint_reset: bool = False,
**kwargs,
) -> Optional[Tuple[bool, Optional[bool]]]:
if not self.use_cache:
Expand All @@ -901,11 +889,6 @@ def checkout(
# backward compatibility
return None

if self.checkpoint and checkpoint_reset:
if self.exists:
self.remove()
return None

added = not self.exists

try:
Expand All @@ -922,7 +905,7 @@ def checkout(
**kwargs,
)
except CheckoutError:
if allow_missing or self.checkpoint:
if allow_missing:
return None
raise
self.set_exec()
Expand Down Expand Up @@ -1270,7 +1253,6 @@ def _merge_dir_version_meta(self, other: "Output"):
Required(Output.PARAM_PATH): str,
Output.PARAM_PLOT: bool,
Output.PARAM_PERSIST: bool,
Output.PARAM_CHECKPOINT: bool,
Output.PARAM_CLOUD: CLOUD_SCHEMA,
}

Expand Down
Loading

0 comments on commit 4e03644

Please sign in to comment.