Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experiments: checkpoints proof of concept #4591

Merged
merged 23 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f892818
stage: make runs checkpointable
pmrowla Sep 17, 2020
e240e1c
api: add make_checkpoint API call
pmrowla Sep 17, 2020
6ce65b1
tests: add test for checkpointed stage
pmrowla Sep 17, 2020
ec04783
experiments: implement checkpoints callback chain
pmrowla Sep 22, 2020
998247c
initial working state
pmrowla Sep 22, 2020
68f9a87
use .dvc/tmp for checkpoint signal file location
pmrowla Sep 24, 2020
3fe2232
force repro for checkpoint experiments
pmrowla Sep 25, 2020
91225e7
checkout: add flag to allow missing persistent outputs
pmrowla Sep 25, 2020
d756dbc
use allow_persist_missing on checkpoint experiment repro
pmrowla Sep 25, 2020
ae15b7b
executor: force checkout before checkpoints experiment repro
pmrowla Oct 5, 2020
76ae162
move checkpoint commands to `dvc exp run`
pmrowla Oct 5, 2020
add73f7
support resuming checkpoint runs with --continue
pmrowla Oct 5, 2020
cf11f21
fix git bugs
pmrowla Oct 5, 2020
9ef119e
experiments: include checkpoint commits in `exp show`
pmrowla Oct 6, 2020
0269c8b
use 1sec sleep timers
pmrowla Oct 6, 2020
c8a322b
fix apply_workspace test conflict
pmrowla Oct 6, 2020
60345f5
tests: add test for `dvc exp run`
pmrowla Oct 6, 2020
3600a9f
cleanup signal file behavior
pmrowla Oct 6, 2020
d1a9648
fix bool conversion
pmrowla Oct 6, 2020
169f5c1
tests: add tests for checkpoint and checkpoint_continue
pmrowla Oct 6, 2020
7f788e8
fix styling
pmrowla Oct 6, 2020
36f0ff4
fix checkpoint stage test
pmrowla Oct 6, 2020
53a2c5a
fix checkpoint monitor thread condition/notification
pmrowla Oct 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions dvc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,34 @@ def _make_repo(repo_url=None, rev=None):
pass # fallthrough to external_repo
with external_repo(url=repo_url, rev=rev) as repo:
yield repo


def make_checkpoint():
"""
Signal DVC to create a checkpoint experiment.

If the current process is being run from DVC, this function will block
until DVC has finished creating the checkpoint. Otherwise, this function
will return immediately.
"""
import builtins
from time import sleep

from dvc.stage.run import CHECKPOINT_SIGNAL_FILE

if os.getenv("DVC_CHECKPOINT") is None:
return

root_dir = Repo.find_root()
signal_file = os.path.join(
root_dir, Repo.DVC_DIR, "tmp", CHECKPOINT_SIGNAL_FILE
)

with builtins.open(signal_file, "w") as fobj:
# NOTE: force flushing/writing empty file to disk, otherwise when
# run in certain contexts (pytest) file may not actually be written
fobj.write("")
fobj.flush()
os.fsync(fobj.fileno())
while os.path.exists(signal_file):
sleep(1)
234 changes: 230 additions & 4 deletions dvc/command/experiments.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import argparse
import io
import logging
import os
from collections import OrderedDict
from datetime import date
from itertools import groupby
from typing import Iterable, Optional

from dvc.command import completion
from dvc.command.base import CmdBase, append_doc_link, fix_subparsers
from dvc.command.metrics import DEFAULT_PRECISION
from dvc.command.metrics import DEFAULT_PRECISION, _show_metrics
from dvc.command.status import CmdDataStatus
from dvc.dvcfile import PIPELINE_FILE
from dvc.exceptions import DvcException, InvalidArgumentError
from dvc.utils.flatten import flatten

Expand Down Expand Up @@ -109,19 +113,30 @@ def _collect_rows(
reverse = sort_order == "desc"
experiments = _sort_exp(experiments, sort_by, sort_type, reverse)

last_tip = None
for i, (rev, exp) in enumerate(experiments.items()):
row = []
style = None
queued = "*" if exp.get("queued", False) else ""

tip = exp.get("checkpoint_tip")
if rev == "baseline":
name = exp.get("name", base_rev)
row.append(f"{name}")
style = "bold"
elif i < len(experiments) - 1:
row.append(f"β”œβ”€β”€ {queued}{rev[:7]}")
else:
row.append(f"└── {queued}{rev[:7]}")
if tip and tip == last_tip:
tree = "β”‚ β•Ÿ"
else:
if i < len(experiments) - 1:
if tip:
tree = "β”œβ”€β•₯"
else:
tree = "β”œβ”€β”€"
else:
tree = "└──"
row.append(f"{tree} {queued}{rev[:7]}")
last_tip = tip

if not no_timestamp:
row.append(_format_time(exp.get("timestamp")))
Expand Down Expand Up @@ -373,6 +388,64 @@ def run(self):
return 0


class CmdExperimentsRun(CmdBase):
def run(self):
if not self.repo.experiments:
return 0

saved_dir = os.path.realpath(os.curdir)
os.chdir(self.args.cwd)

# Dirty hack so the for loop below can at least enter once
if self.args.all_pipelines:
self.args.targets = [None]
elif not self.args.targets:
self.args.targets = self.default_targets

ret = 0
for target in self.args.targets:
try:
stages = self.repo.reproduce(
target,
single_item=self.args.single_item,
force=self.args.force,
dry=self.args.dry,
interactive=self.args.interactive,
pipeline=self.args.pipeline,
all_pipelines=self.args.all_pipelines,
run_cache=not self.args.no_run_cache,
no_commit=self.args.no_commit,
downstream=self.args.downstream,
recursive=self.args.recursive,
force_downstream=self.args.force_downstream,
experiment=True,
queue=self.args.queue,
run_all=self.args.run_all,
jobs=self.args.jobs,
params=self.args.params,
checkpoint=(
self.args.checkpoint
or self.args.checkpoint_continue is not None
),
checkpoint_continue=self.args.checkpoint_continue,
)

if len(stages) == 0:
logger.info(CmdDataStatus.UP_TO_DATE_MSG)

if self.args.metrics:
metrics = self.repo.metrics.show()
logger.info(_show_metrics(metrics))

except DvcException:
logger.exception("")
ret = 1
break

os.chdir(saved_dir)
return ret


def add_parser(subparsers, parent_parser):
EXPERIMENTS_HELP = "Commands to display and compare experiments."

Expand Down Expand Up @@ -552,3 +625,156 @@ def add_parser(subparsers, parent_parser):
metavar="<n>",
)
experiments_diff_parser.set_defaults(func=CmdExperimentsDiff)

EXPERIMENTS_RUN_HELP = (
"Reproduce complete or partial experiment pipelines."
)
experiments_run_parser = experiments_subparsers.add_parser(
Comment on lines +628 to +632
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all experiments run/repro behavior will be moved from dvc repro to dvc exp run in a follow-up PR, for now repro -e is just duplicated here so that checkpoint runs work correctly

"run",
parents=[parent_parser],
description=append_doc_link(EXPERIMENTS_RUN_HELP, "experiments/run"),
help=EXPERIMENTS_RUN_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
experiments_run_parser.add_argument(
"targets",
nargs="*",
help=f"Stages to reproduce. '{PIPELINE_FILE}' by default.",
).complete = completion.DVC_FILE
experiments_run_parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="Reproduce even if dependencies were not changed.",
)
experiments_run_parser.add_argument(
"-s",
"--single-item",
action="store_true",
default=False,
help="Reproduce only single data item without recursive dependencies "
"check.",
)
experiments_run_parser.add_argument(
"-c",
"--cwd",
default=os.path.curdir,
help="Directory within your repo to reproduce from. Note: deprecated "
"by `dvc --cd <path>`.",
metavar="<path>",
)
experiments_run_parser.add_argument(
"-m",
"--metrics",
action="store_true",
default=False,
help="Show metrics after reproduction.",
)
experiments_run_parser.add_argument(
"--dry",
action="store_true",
default=False,
help="Only print the commands that would be executed without "
"actually executing.",
)
experiments_run_parser.add_argument(
"-i",
"--interactive",
action="store_true",
default=False,
help="Ask for confirmation before reproducing each stage.",
)
experiments_run_parser.add_argument(
"-p",
"--pipeline",
action="store_true",
default=False,
help="Reproduce the whole pipeline that the specified stage file "
"belongs to.",
)
experiments_run_parser.add_argument(
"-P",
"--all-pipelines",
action="store_true",
default=False,
help="Reproduce all pipelines in the repo.",
)
experiments_run_parser.add_argument(
"-R",
"--recursive",
action="store_true",
default=False,
help="Reproduce all stages in the specified directory.",
)
experiments_run_parser.add_argument(
"--no-run-cache",
action="store_true",
default=False,
help=(
"Execute stage commands even if they have already been run with "
"the same command/dependencies/outputs/etc before."
),
)
experiments_run_parser.add_argument(
"--force-downstream",
action="store_true",
default=False,
help="Reproduce all descendants of a changed stage even if their "
"direct dependencies didn't change.",
)
experiments_run_parser.add_argument(
"--no-commit",
action="store_true",
default=False,
help="Don't put files/directories into cache.",
)
experiments_run_parser.add_argument(
"--downstream",
action="store_true",
default=False,
help="Start from the specified stages when reproducing pipelines.",
)
experiments_run_parser.add_argument(
"--params",
action="append",
default=[],
help="Use the specified param values when reproducing pipelines.",
metavar="[<filename>:]<params_list>",
)
experiments_run_parser.add_argument(
"--queue",
action="store_true",
default=False,
help="Stage this experiment in the run queue for future execution.",
)
experiments_run_parser.add_argument(
"--run-all",
action="store_true",
default=False,
help="Execute all experiments in the run queue.",
)
experiments_run_parser.add_argument(
"-j",
"--jobs",
type=int,
help="Run the specified number of experiments at a time in parallel.",
metavar="<number>",
)
experiments_run_parser.add_argument(
"--checkpoint",
action="store_true",
default=False,
help="Reproduce pipelines as a checkpoint experiment.",
)
experiments_run_parser.add_argument(
"--continue",
nargs=1,
default=None,
dest="checkpoint_continue",
help=(
"Continue from the specified checkpoint experiment"
"(implies --checkpoint)."
),
)
experiments_run_parser.set_defaults(func=CmdExperimentsRun)
23 changes: 15 additions & 8 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dvc.prompt as prompt
from dvc.cache import NamedCache
from dvc.exceptions import (
CheckoutError,
CollectCacheError,
DvcException,
MergeError,
Expand Down Expand Up @@ -325,6 +326,7 @@ def checkout(
progress_callback=None,
relink=False,
filter_info=None,
allow_persist_missing=False,
):
if not self.use_cache:
if progress_callback:
Expand All @@ -333,14 +335,19 @@ def checkout(
)
return None

return self.cache.checkout(
self.path_info,
self.hash_info,
force=force,
progress_callback=progress_callback,
relink=relink,
filter_info=filter_info,
)
try:
return self.cache.checkout(
self.path_info,
self.hash_info,
force=force,
progress_callback=progress_callback,
relink=relink,
filter_info=filter_info,
)
except CheckoutError:
if self.persist and allow_persist_missing:
return None
raise

def remove(self, ignore_remove=False):
self.tree.remove(self.path_info)
Expand Down
2 changes: 2 additions & 0 deletions dvc/repo/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def checkout(
force=False,
relink=False,
recursive=False,
allow_persist_missing=False,
):
from dvc.stage.exceptions import (
StageFileBadNameError,
Expand Down Expand Up @@ -96,6 +97,7 @@ def checkout(
progress_callback=pbar.update_msg,
relink=relink,
filter_info=filter_info,
allow_persist_missing=allow_persist_missing,
)
for key, items in result.items():
stats[key].extend(_fspath_dir(path) for path in items)
Expand Down
Loading