diff --git a/dvc/command/experiments.py b/dvc/command/experiments.py index ab1daa3c7f..a8d591f4ad 100644 --- a/dvc/command/experiments.py +++ b/dvc/command/experiments.py @@ -48,6 +48,10 @@ def _round(val): return val def _extend(row, names, items): + if not items: + row.extend(["-"] * len(names)) + return + for fname, item in items: if isinstance(item, dict): item = flatten(item, ".") @@ -62,13 +66,14 @@ def _extend(row, names, items): for i, (rev, exp) in enumerate(experiments.items()): row = [] style = None + queued = "*" if exp.get("queued", False) else "" if rev == "baseline": row.append(f"{base_rev}") style = "bold" elif i < len(experiments) - 1: - row.append(f"├── {rev[:7]}") + row.append(f"├── {queued}{rev[:7]}") else: - row.append(f"└── {rev[:7]}") + row.append(f"└── {queued}{rev[:7]}") _extend(row, metric_names, exp.get("metrics", {}).items()) _extend(row, param_names, exp.get("params", {}).items()) @@ -229,6 +234,7 @@ def add_parser(subparsers, parent_parser): experiments_parser = subparsers.add_parser( "experiments", parents=[parent_parser], + aliases=["exp"], description=append_doc_link(EXPERIMENTS_HELP, "experiments"), formatter_class=argparse.RawDescriptionHelpFormatter, ) diff --git a/dvc/command/repro.py b/dvc/command/repro.py index 2b06ec03e5..ecad1c4a2d 100644 --- a/dvc/command/repro.py +++ b/dvc/command/repro.py @@ -41,6 +41,9 @@ def run(self): recursive=self.args.recursive, force_downstream=self.args.force_downstream, experiment=self.args.experiment, + queue=self.args.queue, + run_all=self.args.run_all, + jobs=self.args.jobs, ) if len(stages) == 0: @@ -174,4 +177,16 @@ def add_parser(subparsers, parent_parser): default=False, help=argparse.SUPPRESS, ) + repro_parser.add_argument( + "--queue", action="store_true", default=False, help=argparse.SUPPRESS + ) + repro_parser.add_argument( + "--run-all", + action="store_true", + default=False, + help=argparse.SUPPRESS, + ) + repro_parser.add_argument( + "-j", "--jobs", type=int, help=argparse.SUPPRESS, metavar="" + ) repro_parser.set_defaults(func=CmdRepro) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index dcbe550354..fe899762d6 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -1,21 +1,33 @@ import logging import os +import re import tempfile -from contextlib import contextmanager +from concurrent.futures import ProcessPoolExecutor, as_completed +from typing import Iterable, Optional from funcy import cached_property from dvc.exceptions import DvcException +from dvc.repo.experiments.executor import ExperimentExecutor, LocalExecutor from dvc.scm.git import Git from dvc.stage.serialize import to_lockfile from dvc.utils import dict_sha256, env2bool, relpath -from dvc.utils.fs import remove +from dvc.utils.fs import copyfile, remove logger = logging.getLogger(__name__) +def hash_exp(stages): + exp_data = {} + for stage in stages: + exp_data.update(to_lockfile(stage)) + return dict_sha256(exp_data) + + class UnchangedExperimentError(DvcException): - pass + def __init__(self, rev): + super().__init__("Experiment identical to baseline '{rev[:7]}'.") + self.rev = rev class Experiments: @@ -26,6 +38,11 @@ class Experiments: """ EXPERIMENTS_DIR = "experiments" + PACKED_ARGS_FILE = "repro.dat" + STASH_MSG_PREFIX = "dvc-exp-" + STASH_EXPERIMENT_RE = re.compile( + r"(?:On \(.*\): )dvc-exp-(?P[0-9a-f]+)$" + ) def __init__(self, repo): if not ( @@ -47,10 +64,13 @@ def scm(self): return Git(self.exp_dir) return self._init_clone() + @cached_property + def dvc_dir(self): + return relpath(self.repo.dvc_dir, self.repo.scm.root_dir) + @cached_property def exp_dvc_dir(self): - dvc_dir = relpath(self.repo.dvc_dir, self.repo.scm.root_dir) - return os.path.join(self.exp_dir, dvc_dir) + return os.path.join(self.exp_dir, self.dvc_dir) @cached_property def exp_dvc(self): @@ -59,19 +79,24 @@ def exp_dvc(self): return Repo(self.exp_dvc_dir) - @staticmethod - def exp_hash(stages): - exp_data = {} - for stage in stages: - exp_data.update(to_lockfile(stage)) - return dict_sha256(exp_data) - - @contextmanager - def chdir(self): - cwd = os.getcwd() - os.chdir(self.exp_dvc.root_dir) - yield - os.chdir(cwd) + @cached_property + def args_file(self): + return os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) + + @property + def stash_reflog(self): + if "refs/stash" in self.scm.repo.refs: + return self.scm.repo.refs["refs/stash"].log() + return [] + + @property + def stash_revs(self): + revs = {} + for i, entry in enumerate(self.stash_reflog): + m = self.STASH_EXPERIMENT_RE.match(entry.message) + if m: + revs[entry.newhexsha] = (i, m.group("baseline_rev")) + return revs def _init_clone(self): src_dir = self.repo.scm.root_dir @@ -90,15 +115,18 @@ def _config_clone(self): def _scm_checkout(self, rev): self.scm.repo.git.reset(hard=True) + if self.scm.repo.head.is_detached: + # switch back to default branch + self.scm.repo.heads[0].checkout() if not Git.is_sha(rev) or not self.scm.has_rev(rev): - self.scm.fetch(all=True) + self.scm.pull() logger.debug("Checking out base experiment commit '%s'", rev) self.scm.checkout(rev) - def _patch_exp(self): - """Create a patch based on the current (parent) workspace and apply it - to the experiment workspace. + def _stash_exp(self, *args, **kwargs): + """Stash changes from the current (parent) workspace as an experiment. """ + rev = self.scm.get_rev() tmp = tempfile.NamedTemporaryFile(delete=False).name try: self.repo.scm.repo.git.diff(patch=True, output=tmp) @@ -106,16 +134,28 @@ def _patch_exp(self): logger.debug("Patching experiment workspace") self.scm.repo.git.apply(tmp) else: - raise UnchangedExperimentError( - "Experiment identical to baseline commit." - ) + raise UnchangedExperimentError(rev) finally: remove(tmp) + self._pack_args(*args, **kwargs) + msg = f"{self.STASH_MSG_PREFIX}{rev}" + self.scm.repo.git.stash("push", "-m", msg) + return self.scm.resolve_rev("stash@{0}") + + def _pack_args(self, *args, **kwargs): + ExperimentExecutor.pack_repro_args(self.args_file, *args, **kwargs) + self.scm.add(self.args_file) - def _commit(self, stages, check_exists=True, branch=True, rev=None): + def _unpack_args(self, tree=None): + args_file = os.path.join(self.exp_dvc.tmp_dir, self.PACKED_ARGS_FILE) + return ExperimentExecutor.unpack_repro_args(args_file, tree=tree) + + def _commit(self, exp_hash, check_exists=True, branch=True): """Commit stages as an experiment and return the commit SHA.""" - hash_ = self.exp_hash(stages) - exp_name = f"{rev[:7]}-{hash_}" + if not self.scm.is_dirty(): + raise UnchangedExperimentError(self.scm.get_rev()) + rev = self.scm.get_rev() + exp_name = f"{rev[:7]}-{exp_hash}" if branch: if check_exists and exp_name in self.scm.list_branches(): logger.debug("Using existing experiment branch '%s'", exp_name) @@ -126,10 +166,30 @@ def _commit(self, stages, check_exists=True, branch=True, rev=None): self.scm.commit(f"Add experiment {exp_name}") return self.scm.get_rev() - def _reproduce(self, *args, **kwargs): - """Run `dvc repro` inside the experiments workspace.""" - with self.chdir(): - return self.exp_dvc.reproduce(*args, **kwargs) + def reproduce_one(self, queue=False, **kwargs): + """Reproduce and checkout a single experiment.""" + stash_rev = self.new(**kwargs) + if queue: + logger.info( + "Queued experiment '%s' for future execution.", stash_rev[:7] + ) + return [stash_rev] + results = self.reproduce([stash_rev], keep_stash=False) + for exp_rev in results: + self.checkout_exp(exp_rev, force=True) + return results + + def reproduce_queued(self, **kwargs): + results = self.reproduce(**kwargs) + if results: + revs = [f"{rev[:7]}" for rev in results] + logger.info( + "Successfully reproduced experiment(s) '%s'.\n" + "Use `dvc exp checkout ` to apply the results of " + "a specific experiment to your workspace.", + ", ".join(revs), + ) + return results def new(self, *args, workspace=True, **kwargs): """Create a new experiment. @@ -141,19 +201,142 @@ def new(self, *args, workspace=True, **kwargs): self._scm_checkout(rev) if workspace: try: - self._patch_exp() + stash_rev = self._stash_exp(*args, **kwargs) except UnchangedExperimentError as exc: logger.info("Reproducing existing experiment '%s'.", rev[:7]) raise exc else: # configure params via command line here pass - self.exp_dvc.checkout() - stages = self._reproduce(*args, **kwargs) - exp_rev = self._commit(stages, rev=rev) - self.checkout_exp(exp_rev, force=True) - logger.info("Generated experiment '%s'.", exp_rev[:7]) - return stages + logger.debug( + "Stashed experiment '%s' for future execution.", stash_rev[:7] + ) + return stash_rev + + def reproduce( + self, + revs: Optional[Iterable] = None, + keep_stash: Optional[bool] = True, + **kwargs, + ): + """Reproduce the specified experiments. + + Args: + revs: If revs is not specified, all stashed experiments will be + reproduced. + keep_stash: If True, stashed experiments will be preserved if they + fail to reproduce successfully. + """ + stash_revs = self.stash_revs + + # to_run contains mapping of: + # input_rev: (stash_index, baseline_rev) + # where input_rev contains the changes to execute (usually a stash + # commit) and baseline_rev is the baseline to compare output against. + # The final experiment commit will be branched from baseline_rev. + if revs is None: + to_run = { + rev: baseline_rev + for rev, (_, baseline_rev) in stash_revs.items() + } + else: + to_run = { + rev: stash_revs[rev][1] if rev in stash_revs else rev + for rev in revs + } + + # setup executors + executors = {} + for rev, baseline_rev in to_run.items(): + tree = self.scm.get_tree(rev) + repro_args, repro_kwargs = self._unpack_args(tree) + executor = LocalExecutor( + tree, + baseline_rev, + repro_args=repro_args, + repro_kwargs=repro_kwargs, + dvc_dir=self.dvc_dir, + cache_dir=self.repo.cache.local.cache_dir, + ) + executors[rev] = executor + + exec_results = self._reproduce(executors, **kwargs) + + if keep_stash: + # only drop successfully run stashed experiments + to_drop = sorted( + ( + stash_revs[rev][0] + for rev in exec_results + if rev in stash_revs + ), + reverse=True, + ) + else: + # drop all stashed experiments + to_drop = sorted( + (stash_revs[rev][0] for rev in to_run if rev in stash_revs), + reverse=True, + ) + for index in to_drop: + self.scm.repo.git.stash("drop", index) + + result = {} + for _, exp_result in exec_results.items(): + result.update(exp_result) + return result + + def _reproduce(self, executors: dict, jobs: Optional[int] = 1) -> dict: + """Run dvc repro for the specified ExperimentExecutors in parallel. + + Returns dict containing successfully executed experiments. + """ + result = {} + + with ProcessPoolExecutor(max_workers=jobs) as workers: + futures = {} + for rev, executor in executors.items(): + future = workers.submit( + executor.reproduce, + executor.dvc_dir, + cwd=executor.dvc.root_dir, + **executor.repro_kwargs, + ) + futures[future] = (rev, executor) + for future in as_completed(futures): + rev, executor = futures[future] + exc = future.exception() + if exc is None: + exp_hash = future.result() + logger.debug(f"ran exp based on {executor.baseline_rev}") + self._scm_checkout(executor.baseline_rev) + self._collect_output(executor.baseline_rev, executor) + remove(self.args_file) + try: + exp_rev = self._commit(exp_hash) + except UnchangedExperimentError: + logger.debug( + "Experiment '%s' identical to baseline '%s'", + rev, + executor.baseline_rev, + ) + exp_rev = executor.baseline_rev + logger.info("Reproduced experiment '%s'.", exp_rev[:7]) + result[rev] = {exp_rev: exp_hash} + else: + logger.exception( + "Failed to reproduce experiment '%s'", rev + ) + executor.cleanup() + + return result + + def _collect_output(self, rev: str, executor: ExperimentExecutor): + logger.debug("copying tmp output from '%s'", executor.tmp_dir) + tree = self.scm.get_tree(rev) + for fname in tree.walk_files(tree.tree_root): + src = executor.path_info / relpath(fname, tree.tree_root) + copyfile(src, fname) def checkout_exp(self, rev, force=False): """Checkout an experiment to the user's workspace.""" @@ -162,7 +345,6 @@ def checkout_exp(self, rev, force=False): if force: self.repo.scm.repo.git.reset(hard=True) - logger.debug(f"checkout {rev}") self._scm_checkout(rev) tmp = tempfile.NamedTemporaryFile(delete=False).name diff --git a/dvc/repo/experiments/executor.py b/dvc/repo/experiments/executor.py new file mode 100644 index 0000000000..494c389f33 --- /dev/null +++ b/dvc/repo/experiments/executor.py @@ -0,0 +1,132 @@ +import logging +import os +import pickle +from tempfile import TemporaryDirectory + +from funcy import cached_property + +from dvc.path_info import PathInfo +from dvc.stage import PipelineStage +from dvc.tree.base import BaseTree +from dvc.utils import relpath +from dvc.utils.fs import copy_fobj_to_file, makedirs + +logger = logging.getLogger(__name__) + + +class ExperimentExecutor: + """Base class for executing experiments in parallel. + + Args: + src_tree: source tree for this experiment. + baseline_rev: baseline revision that this experiment is derived from. + + Optional keyword args: + repro_args: Args to be passed into reproduce. + repro_kwargs: Keyword args to be passed into reproduce. + """ + + def __init__(self, src_tree: BaseTree, baseline_rev: str, **kwargs): + self.src_tree = src_tree + self.baseline_rev = baseline_rev + self.repro_args = kwargs.pop("repro_args", []) + self.repro_kwargs = kwargs.pop("repro_kwargs", {}) + + def run(self): + pass + + def cleanup(self): + pass + + # TODO: come up with better way to stash repro arguments + @staticmethod + def pack_repro_args(path, *args, tree=None, **kwargs): + open_func = tree.open if tree else open + data = {"args": args, "kwargs": kwargs} + with open_func(path, "wb") as fobj: + pickle.dump(data, fobj) + + @staticmethod + def unpack_repro_args(path, tree=None): + open_func = tree.open if tree else open + with open_func(path, "rb") as fobj: + data = pickle.load(fobj) + return data["args"], data["kwargs"] + + +class LocalExecutor(ExperimentExecutor): + """Local machine exepriment executor.""" + + def __init__(self, src_tree: BaseTree, baseline_rev: str, **kwargs): + dvc_dir = kwargs.pop("dvc_dir") + cache_dir = kwargs.pop("cache_dir") + super().__init__(src_tree, baseline_rev, **kwargs) + self.tmp_dir = TemporaryDirectory() + logger.debug("Init local executor in dir '%s'.", self.tmp_dir) + self.dvc_dir = os.path.join(self.tmp_dir.name, dvc_dir) + try: + for fname in src_tree.walk_files(src_tree.tree_root): + dest = self.path_info / relpath(fname, src_tree.tree_root) + if not os.path.exists(dest.parent): + makedirs(dest.parent) + with src_tree.open(fname, "rb") as fobj: + copy_fobj_to_file(fobj, dest) + except Exception: + self.tmp_dir.cleanup() + raise + self._config(cache_dir) + + def _config(self, cache_dir): + local_config = os.path.join(self.dvc_dir, "config.local") + logger.debug("Writing experiments local config '%s'", local_config) + with open(local_config, "w") as fobj: + fobj.write("[core]\n no_scm = true\n") + fobj.write(f"[cache]\n dir = {cache_dir}") + + @cached_property + def dvc(self): + from dvc.repo import Repo + + return Repo(self.dvc_dir) + + @cached_property + def path_info(self): + return PathInfo(self.tmp_dir.name) + + @staticmethod + def reproduce(dvc_dir, cwd=None, **kwargs): + """Run dvc repro and return the result.""" + from dvc.repo import Repo + from dvc.repo.experiments import hash_exp + + unchanged = [] + + def filter_pipeline(stage): + if isinstance(stage, PipelineStage): + unchanged.append(stage) + + if cwd: + old_cwd = os.getcwd() + os.chdir(cwd) + else: + old_cwd = None + cwd = os.getcwd() + + try: + logger.debug("Running repro in '%s'", cwd) + dvc = Repo(dvc_dir) + dvc.checkout() + stages = dvc.reproduce(on_unchanged=filter_pipeline, **kwargs) + finally: + if old_cwd is not None: + os.chdir(old_cwd) + + # ideally we would return stages here like a normal repro() call, but + # stages is not currently picklable and cannot be returned across + # multiprocessing calls + return hash_exp(stages + unchanged) + + def cleanup(self): + logger.debug("Removing tmpdir '%s'", self.tmp_dir) + self.tmp_dir.cleanup() + super().cleanup() diff --git a/dvc/repo/experiments/show.py b/dvc/repo/experiments/show.py index bbb2c706ff..da84079e7b 100644 --- a/dvc/repo/experiments/show.py +++ b/dvc/repo/experiments/show.py @@ -12,7 +12,7 @@ EXP_RE = re.compile(r"(?P[a-f0-9]{7})-(?P[a-f0-9]+)") -def _collect_experiment(repo, branch): +def _collect_experiment(repo, branch, stash=False): res = defaultdict(dict) for rev in repo.brancher(revs=[branch]): configs = _collect_configs(repo) @@ -20,9 +20,10 @@ def _collect_experiment(repo, branch): if params: res["params"] = params - metrics = _collect_metrics(repo, None, False) - vals = _read_metrics(repo, metrics, rev) - if vals: + res["queued"] = stash + if not stash: + metrics = _collect_metrics(repo, None, False) + vals = _read_metrics(repo, metrics, rev) res["metrics"] = vals return res @@ -37,8 +38,9 @@ def show( if revs is None: revs = [repo.scm.get_rev()] - revs = set( - repo.brancher( + revs = OrderedDict( + (rev, None) + for rev in repo.brancher( revs=revs, all_branches=all_branches, all_tags=all_tags, @@ -49,6 +51,7 @@ def show( for rev in revs: res[rev]["baseline"] = _collect_experiment(repo, rev) + # collect reproduced experiments for exp_branch in repo.experiments.scm.list_branches(): m = re.match(EXP_RE, exp_branch) if m: @@ -61,4 +64,12 @@ def show( ) res[rev][exp_rev] = experiment + # collect queued (not yet reproduced) experiments + for stash_rev, (_, baseline_rev) in repo.experiments.stash_revs.items(): + with repo.experiments.chdir(): + experiment = _collect_experiment( + repo.experiments.exp_dvc, stash_rev, stash=True + ) + res[baseline_rev][stash_rev] = experiment + return res diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index e127c4ea42..f94f9dd89f 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -71,13 +71,20 @@ def reproduce( ) experiment = kwargs.pop("experiment", False) - if experiment and self.experiments: + queue = kwargs.pop("queue", False) + run_all = kwargs.pop("run_all", False) + jobs = kwargs.pop("jobs", 1) + if (experiment or run_all) and self.experiments: try: - return self.experiments.new( + return _reproduce_experiments( + self, target=target, recursive=recursive, all_pipelines=all_pipelines, - **kwargs + queue=queue, + run_all=run_all, + jobs=jobs, + **kwargs, ) except UnchangedExperimentError: # If experiment contains no changes, just run regular repro @@ -109,8 +116,14 @@ def reproduce( return _reproduce_stages(active_graph, targets, **kwargs) +def _reproduce_experiments(repo, run_all=False, jobs=1, **kwargs): + if run_all: + return repo.experiments.reproduce_queued(jobs=jobs) + return repo.experiments.reproduce_one(**kwargs) + + def _reproduce_stages( - G, stages, downstream=False, single_item=False, **kwargs + G, stages, downstream=False, single_item=False, on_unchanged=None, **kwargs ): r"""Derive the evaluation of the given node for the given graph. @@ -194,7 +207,10 @@ def _reproduce_stages( # dependencies didn't change. kwargs["force"] = True - result.extend(ret) + if ret: + result.extend(ret) + elif on_unchanged is not None: + on_unchanged(stage) except Exception as exc: raise ReproductionError(stage.relpath) from exc diff --git a/tests/func/experiments/test_show.py b/tests/func/experiments/test_show.py index bfcb595e49..9571a8adb5 100644 --- a/tests/func/experiments/test_show.py +++ b/tests/func/experiments/test_show.py @@ -15,5 +15,6 @@ def test_show_simple(tmp_dir, scm, dvc): "baseline": { "metrics": {"metrics.yaml": {"foo": 1}}, "params": {"params.yaml": {"foo": 1}}, + "queued": False, } } diff --git a/tests/unit/command/test_repro.py b/tests/unit/command/test_repro.py index fbcf1271fc..0b2302ef74 100644 --- a/tests/unit/command/test_repro.py +++ b/tests/unit/command/test_repro.py @@ -15,6 +15,9 @@ "recursive": False, "force_downstream": False, "experiment": False, + "queue": False, + "run_all": False, + "jobs": None, }