Skip to content

Commit

Permalink
experiments: checkpoints proof of concept (iterative#4591)
Browse files Browse the repository at this point in the history
* stage: make runs checkpointable

* api: add make_checkpoint API call

* tests: add test for checkpointed stage

* experiments: implement checkpoints callback chain

* initial working state

* use .dvc/tmp for checkpoint signal file location

* force repro for checkpoint experiments

* checkout: add flag to allow missing persistent outputs

(i.e. not yet created)

* use allow_persist_missing on checkpoint experiment repro

* executor: force checkout before checkpoints experiment repro

* move checkpoint commands to `dvc exp run`

* support resuming checkpoint runs with --continue

* fix git bugs

* experiments: include checkpoint commits in `exp show`

* use 1sec sleep timers

* fix apply_workspace test conflict

* tests: add test for `dvc exp run`

* cleanup signal file behavior

* fix bool conversion

* tests: add tests for checkpoint and checkpoint_continue

* fix styling

* fix checkpoint stage test

* fix checkpoint monitor thread condition/notification
  • Loading branch information
pmrowla authored Oct 6, 2020
1 parent 7424f22 commit f0a729d
Show file tree
Hide file tree
Showing 15 changed files with 738 additions and 70 deletions.
31 changes: 31 additions & 0 deletions dvc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,34 @@ def _make_repo(repo_url=None, rev=None):
pass # fallthrough to external_repo
with external_repo(url=repo_url, rev=rev) as repo:
yield repo


def make_checkpoint():
"""
Signal DVC to create a checkpoint experiment.
If the current process is being run from DVC, this function will block
until DVC has finished creating the checkpoint. Otherwise, this function
will return immediately.
"""
import builtins
from time import sleep

from dvc.stage.run import CHECKPOINT_SIGNAL_FILE

if os.getenv("DVC_CHECKPOINT") is None:
return

root_dir = Repo.find_root()
signal_file = os.path.join(
root_dir, Repo.DVC_DIR, "tmp", CHECKPOINT_SIGNAL_FILE
)

with builtins.open(signal_file, "w") as fobj:
# NOTE: force flushing/writing empty file to disk, otherwise when
# run in certain contexts (pytest) file may not actually be written
fobj.write("")
fobj.flush()
os.fsync(fobj.fileno())
while os.path.exists(signal_file):
sleep(1)
234 changes: 230 additions & 4 deletions dvc/command/experiments.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import argparse
import io
import logging
import os
from collections import OrderedDict
from datetime import date
from itertools import groupby
from typing import Iterable, Optional

from dvc.command import completion
from dvc.command.base import CmdBase, append_doc_link, fix_subparsers
from dvc.command.metrics import DEFAULT_PRECISION
from dvc.command.metrics import DEFAULT_PRECISION, _show_metrics
from dvc.command.status import CmdDataStatus
from dvc.dvcfile import PIPELINE_FILE
from dvc.exceptions import DvcException, InvalidArgumentError
from dvc.utils.flatten import flatten

Expand Down Expand Up @@ -109,19 +113,30 @@ def _collect_rows(
reverse = sort_order == "desc"
experiments = _sort_exp(experiments, sort_by, sort_type, reverse)

last_tip = None
for i, (rev, exp) in enumerate(experiments.items()):
row = []
style = None
queued = "*" if exp.get("queued", False) else ""

tip = exp.get("checkpoint_tip")
if rev == "baseline":
name = exp.get("name", base_rev)
row.append(f"{name}")
style = "bold"
elif i < len(experiments) - 1:
row.append(f"├── {queued}{rev[:7]}")
else:
row.append(f"└── {queued}{rev[:7]}")
if tip and tip == last_tip:
tree = "│ ╟"
else:
if i < len(experiments) - 1:
if tip:
tree = "├─╥"
else:
tree = "├──"
else:
tree = "└──"
row.append(f"{tree} {queued}{rev[:7]}")
last_tip = tip

if not no_timestamp:
row.append(_format_time(exp.get("timestamp")))
Expand Down Expand Up @@ -373,6 +388,64 @@ def run(self):
return 0


class CmdExperimentsRun(CmdBase):
def run(self):
if not self.repo.experiments:
return 0

saved_dir = os.path.realpath(os.curdir)
os.chdir(self.args.cwd)

# Dirty hack so the for loop below can at least enter once
if self.args.all_pipelines:
self.args.targets = [None]
elif not self.args.targets:
self.args.targets = self.default_targets

ret = 0
for target in self.args.targets:
try:
stages = self.repo.reproduce(
target,
single_item=self.args.single_item,
force=self.args.force,
dry=self.args.dry,
interactive=self.args.interactive,
pipeline=self.args.pipeline,
all_pipelines=self.args.all_pipelines,
run_cache=not self.args.no_run_cache,
no_commit=self.args.no_commit,
downstream=self.args.downstream,
recursive=self.args.recursive,
force_downstream=self.args.force_downstream,
experiment=True,
queue=self.args.queue,
run_all=self.args.run_all,
jobs=self.args.jobs,
params=self.args.params,
checkpoint=(
self.args.checkpoint
or self.args.checkpoint_continue is not None
),
checkpoint_continue=self.args.checkpoint_continue,
)

if len(stages) == 0:
logger.info(CmdDataStatus.UP_TO_DATE_MSG)

if self.args.metrics:
metrics = self.repo.metrics.show()
logger.info(_show_metrics(metrics))

except DvcException:
logger.exception("")
ret = 1
break

os.chdir(saved_dir)
return ret


def add_parser(subparsers, parent_parser):
EXPERIMENTS_HELP = "Commands to display and compare experiments."

Expand Down Expand Up @@ -552,3 +625,156 @@ def add_parser(subparsers, parent_parser):
metavar="<n>",
)
experiments_diff_parser.set_defaults(func=CmdExperimentsDiff)

EXPERIMENTS_RUN_HELP = (
"Reproduce complete or partial experiment pipelines."
)
experiments_run_parser = experiments_subparsers.add_parser(
"run",
parents=[parent_parser],
description=append_doc_link(EXPERIMENTS_RUN_HELP, "experiments/run"),
help=EXPERIMENTS_RUN_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
experiments_run_parser.add_argument(
"targets",
nargs="*",
help=f"Stages to reproduce. '{PIPELINE_FILE}' by default.",
).complete = completion.DVC_FILE
experiments_run_parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="Reproduce even if dependencies were not changed.",
)
experiments_run_parser.add_argument(
"-s",
"--single-item",
action="store_true",
default=False,
help="Reproduce only single data item without recursive dependencies "
"check.",
)
experiments_run_parser.add_argument(
"-c",
"--cwd",
default=os.path.curdir,
help="Directory within your repo to reproduce from. Note: deprecated "
"by `dvc --cd <path>`.",
metavar="<path>",
)
experiments_run_parser.add_argument(
"-m",
"--metrics",
action="store_true",
default=False,
help="Show metrics after reproduction.",
)
experiments_run_parser.add_argument(
"--dry",
action="store_true",
default=False,
help="Only print the commands that would be executed without "
"actually executing.",
)
experiments_run_parser.add_argument(
"-i",
"--interactive",
action="store_true",
default=False,
help="Ask for confirmation before reproducing each stage.",
)
experiments_run_parser.add_argument(
"-p",
"--pipeline",
action="store_true",
default=False,
help="Reproduce the whole pipeline that the specified stage file "
"belongs to.",
)
experiments_run_parser.add_argument(
"-P",
"--all-pipelines",
action="store_true",
default=False,
help="Reproduce all pipelines in the repo.",
)
experiments_run_parser.add_argument(
"-R",
"--recursive",
action="store_true",
default=False,
help="Reproduce all stages in the specified directory.",
)
experiments_run_parser.add_argument(
"--no-run-cache",
action="store_true",
default=False,
help=(
"Execute stage commands even if they have already been run with "
"the same command/dependencies/outputs/etc before."
),
)
experiments_run_parser.add_argument(
"--force-downstream",
action="store_true",
default=False,
help="Reproduce all descendants of a changed stage even if their "
"direct dependencies didn't change.",
)
experiments_run_parser.add_argument(
"--no-commit",
action="store_true",
default=False,
help="Don't put files/directories into cache.",
)
experiments_run_parser.add_argument(
"--downstream",
action="store_true",
default=False,
help="Start from the specified stages when reproducing pipelines.",
)
experiments_run_parser.add_argument(
"--params",
action="append",
default=[],
help="Use the specified param values when reproducing pipelines.",
metavar="[<filename>:]<params_list>",
)
experiments_run_parser.add_argument(
"--queue",
action="store_true",
default=False,
help="Stage this experiment in the run queue for future execution.",
)
experiments_run_parser.add_argument(
"--run-all",
action="store_true",
default=False,
help="Execute all experiments in the run queue.",
)
experiments_run_parser.add_argument(
"-j",
"--jobs",
type=int,
help="Run the specified number of experiments at a time in parallel.",
metavar="<number>",
)
experiments_run_parser.add_argument(
"--checkpoint",
action="store_true",
default=False,
help="Reproduce pipelines as a checkpoint experiment.",
)
experiments_run_parser.add_argument(
"--continue",
nargs=1,
default=None,
dest="checkpoint_continue",
help=(
"Continue from the specified checkpoint experiment"
"(implies --checkpoint)."
),
)
experiments_run_parser.set_defaults(func=CmdExperimentsRun)
23 changes: 15 additions & 8 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dvc.prompt as prompt
from dvc.cache import NamedCache
from dvc.exceptions import (
CheckoutError,
CollectCacheError,
DvcException,
MergeError,
Expand Down Expand Up @@ -325,6 +326,7 @@ def checkout(
progress_callback=None,
relink=False,
filter_info=None,
allow_persist_missing=False,
):
if not self.use_cache:
if progress_callback:
Expand All @@ -333,14 +335,19 @@ def checkout(
)
return None

return self.cache.checkout(
self.path_info,
self.hash_info,
force=force,
progress_callback=progress_callback,
relink=relink,
filter_info=filter_info,
)
try:
return self.cache.checkout(
self.path_info,
self.hash_info,
force=force,
progress_callback=progress_callback,
relink=relink,
filter_info=filter_info,
)
except CheckoutError:
if self.persist and allow_persist_missing:
return None
raise

def remove(self, ignore_remove=False):
self.tree.remove(self.path_info)
Expand Down
2 changes: 2 additions & 0 deletions dvc/repo/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def checkout(
force=False,
relink=False,
recursive=False,
allow_persist_missing=False,
):
from dvc.stage.exceptions import (
StageFileBadNameError,
Expand Down Expand Up @@ -96,6 +97,7 @@ def checkout(
progress_callback=pbar.update_msg,
relink=relink,
filter_info=filter_info,
allow_persist_missing=allow_persist_missing,
)
for key, items in result.items():
stats[key].extend(_fspath_dir(path) for path in items)
Expand Down
Loading

0 comments on commit f0a729d

Please sign in to comment.