diff --git a/dvc/api.py b/dvc/api/__init__.py similarity index 100% rename from dvc/api.py rename to dvc/api/__init__.py diff --git a/dvc/api/live.py b/dvc/api/live.py new file mode 100644 index 0000000000..39fb59c388 --- /dev/null +++ b/dvc/api/live.py @@ -0,0 +1,24 @@ +import logging +import os +from typing import List + +from dvc.exceptions import NotDvcRepoError +from dvc.repo import Repo +from dvc.utils.html import write + +logger = logging.getLogger(__name__) + + +def summary(path: str, revs: List[str] = None): + try: + root = Repo.find_root() + except NotDvcRepoError: + root = os.getcwd() + + metrics, plots = Repo(root_dir=root, uninitialized=True).live.show( + path, revs + ) + + html_path = path + ".html" + write(html_path, plots, metrics) + logger.info(f"\nfile://{os.path.abspath(html_path)}") diff --git a/dvc/cli.py b/dvc/cli.py index 6600194c0a..48ebbc2920 100644 --- a/dvc/cli.py +++ b/dvc/cli.py @@ -27,6 +27,7 @@ imp_url, init, install, + live, ls, metrics, move, @@ -82,6 +83,7 @@ plots, experiments, check_ignore, + live, ] diff --git a/dvc/command/live.py b/dvc/command/live.py new file mode 100644 index 0000000000..5c1d5faf05 --- /dev/null +++ b/dvc/command/live.py @@ -0,0 +1,94 @@ +import argparse +import logging +import os + +from dvc.command import completion +from dvc.command.base import CmdBase, fix_subparsers +from dvc.utils.html import write + +logger = logging.getLogger(__name__) + + +class CmdLive(CmdBase): + UNINITIALIZED = True + + def _run(self, target, revs=None): + metrics, plots = self.repo.live.show(target=target, revs=revs) + + html_path = self.args.target + ".html" + write(html_path, plots, metrics) + + logger.info(f"\nfile://{os.path.abspath(html_path)}") + + return 0 + + +class CmdLiveShow(CmdLive): + def run(self): + return self._run(self.args.target) + + +class CmdLiveDiff(CmdLive): + def run(self): + return self._run(self.args.target, self.args.revs) + + +def shared_parent_parser(): + parent_parser = argparse.ArgumentParser(add_help=False) + parent_parser.add_argument( + "target", help="Logs dir to produce summary from", + ).complete = completion.DIR + parent_parser.add_argument( + "-o", + "--out", + default=None, + help="Destination path to save plots to", + metavar="", + ).complete = completion.DIR + return parent_parser + + +def add_parser(subparsers, parent_parser): + LIVE_DESCRIPTION = ( + "Commands to visualize and compare dvclive-produced logs." + ) + live_parser = subparsers.add_parser( + "live", + parents=[parent_parser], + formatter_class=argparse.RawDescriptionHelpFormatter, + description=LIVE_DESCRIPTION, + ) + live_subparsers = live_parser.add_subparsers( + dest="cmd", + help="Use `dvc live CMD --help` to display command-specific help.", + ) + + fix_subparsers(live_subparsers) + + SHOW_HELP = "Visualize dvclive directory content." + live_show_parser = live_subparsers.add_parser( + "show", + parents=[parent_parser, shared_parent_parser()], + help=SHOW_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + live_show_parser.set_defaults(func=CmdLiveShow) + + DIFF_HELP = ( + "Show multiple versions of dvclive data, " + "by plotting it in single view." + ) + live_diff_parser = live_subparsers.add_parser( + "diff", + parents=[parent_parser, shared_parent_parser()], + help=DIFF_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + live_diff_parser.add_argument( + "--revs", + nargs="*", + default=None, + help="Git revision (e.g. SHA, branch, tag)", + metavar="", + ) + live_diff_parser.set_defaults(func=CmdLiveDiff) diff --git a/dvc/command/plots.py b/dvc/command/plots.py index ef55b3298c..169d4f24bf 100644 --- a/dvc/command/plots.py +++ b/dvc/command/plots.py @@ -7,28 +7,10 @@ from dvc.exceptions import DvcException from dvc.schema import PLOT_PROPS from dvc.utils import format_link +from dvc.utils.html import write logger = logging.getLogger(__name__) -PAGE_HTML = """ - - - DVC Plot - - - - - - {divs} - -""" - -DIV_HTML = """
-""" - class CmdPlots(CmdBase): def _func(self, *args, **kwargs): @@ -58,16 +40,10 @@ def run(self): logger.info(plots[target]) return 0 - divs = [ - DIV_HTML.format(id=f"plot{i}", vega_json=plot) - for i, plot in enumerate(plots.values()) - ] - html = PAGE_HTML.format(divs="\n".join(divs)) path = self.args.out or "plots.html" - path = os.path.join(os.getcwd(), path) - with open(path, "w") as fobj: - fobj.write(html) + + write(path, plots) logger.info(f"file://{path}") diff --git a/dvc/command/run.py b/dvc/command/run.py index 211cc1de7f..3499cf41c3 100644 --- a/dvc/command/run.py +++ b/dvc/command/run.py @@ -42,6 +42,8 @@ def run(self): metrics_no_cache=self.args.metrics_no_cache, plots=self.args.plots, plots_no_cache=self.args.plots_no_cache, + live=self.args.live, + live_summary=not self.args.live_no_summary, deps=self.args.deps, params=self.args.params, fname=self.args.file, @@ -162,6 +164,15 @@ def add_parser(subparsers, parent_parser): help="Declare output plot file (do not put into DVC cache).", metavar="", ) + run_parser.add_argument( + "--live", help=argparse.SUPPRESS, metavar="", + ) + run_parser.add_argument( + "--live-no-summary", + action="store_true", + default=False, + help=argparse.SUPPRESS, + ) run_parser.add_argument( "--file", metavar="", help=argparse.SUPPRESS, ) diff --git a/dvc/env.py b/dvc/env.py index d1b40671cb..5ddcfbc989 100644 --- a/dvc/env.py +++ b/dvc/env.py @@ -2,3 +2,5 @@ DVC_DAEMON = "DVC_DAEMON" DVC_PAGER = "DVC_PAGER" DVC_ROOT = "DVC_ROOT" +DVCLIVE_PATH = "DVCLIVE_PATH" +DVCLIVE_SUMMARY = "DVCLIVE_SUMMARY" diff --git a/dvc/exceptions.py b/dvc/exceptions.py index 3d182233cd..e89cd9c283 100644 --- a/dvc/exceptions.py +++ b/dvc/exceptions.py @@ -196,9 +196,9 @@ def __init__(self, command, run_options): class MetricDoesNotExistError(MetricsError): def __init__(self, targets: List[str]): if len(targets) == 1: - msg = "File: '{}' does not exist." + msg = "'{}' does not exist." else: - msg = "Files: '{}' do not exist." + msg = "'{}' do not exist." super().__init__(msg.format(", ".join(targets))) diff --git a/dvc/output/__init__.py b/dvc/output/__init__.py index deacd18dbe..7123951b8f 100644 --- a/dvc/output/__init__.py +++ b/dvc/output/__init__.py @@ -81,6 +81,7 @@ def _get( plot=False, persist=False, checkpoint=False, + live=False, desc=None, isexec=False, ): @@ -98,6 +99,7 @@ def _get( plot=plot, persist=persist, checkpoint=checkpoint, + live=live, desc=desc, isexec=isexec, ) @@ -114,6 +116,7 @@ def _get( plot=plot, persist=persist, checkpoint=checkpoint, + live=live, desc=desc, isexec=isexec, ) @@ -127,6 +130,7 @@ def _get( plot=plot, persist=persist, checkpoint=checkpoint, + live=live, desc=desc, isexec=isexec, ) @@ -143,6 +147,7 @@ def loadd_from(stage, d_list): checkpoint = d.pop(BaseOutput.PARAM_CHECKPOINT, False) desc = d.pop(BaseOutput.PARAM_DESC, False) isexec = d.pop(BaseOutput.PARAM_ISEXEC, False) + live = d.pop(BaseOutput.PARAM_LIVE, False) ret.append( _get( stage, @@ -155,6 +160,7 @@ def loadd_from(stage, d_list): checkpoint=checkpoint, desc=desc, isexec=isexec, + live=live, ) ) return ret @@ -169,6 +175,7 @@ def loads_from( persist=False, checkpoint=False, isexec=False, + live=False, ): return [ _get( @@ -181,6 +188,7 @@ def loads_from( persist=persist, checkpoint=checkpoint, isexec=isexec, + live=live, ) for s in s_list ] @@ -209,21 +217,35 @@ def _merge_data(s_list): @collecting -def load_from_pipeline(stage, s_list, typ="outs"): - if typ not in (stage.PARAM_OUTS, stage.PARAM_METRICS, stage.PARAM_PLOTS): +def load_from_pipeline(stage, data, typ="outs"): + if typ not in ( + stage.PARAM_OUTS, + stage.PARAM_METRICS, + stage.PARAM_PLOTS, + stage.PARAM_LIVE, + ): raise ValueError(f"'{typ}' key is not allowed for pipeline files.") metric = typ == stage.PARAM_METRICS plot = typ == stage.PARAM_PLOTS + live = typ == stage.PARAM_LIVE - d = _merge_data(s_list) + if live: + # `live` is single object + data = [data] + + d = _merge_data(data) for path, flags in d.items(): - plt_d = {} + plt_d, live_d = {}, {} if plot: from dvc.schema import PLOT_PROPS plt_d, flags = _split_dict(flags, keys=PLOT_PROPS.keys()) + if live: + from dvc.schema import LIVE_PROPS + + live_d, flags = _split_dict(flags, keys=LIVE_PROPS.keys()) extra = project( flags, [ @@ -232,4 +254,13 @@ def load_from_pipeline(stage, s_list, typ="outs"): BaseOutput.PARAM_CHECKPOINT, ], ) - yield _get(stage, path, {}, plot=plt_d or plot, metric=metric, **extra) + + yield _get( + stage, + path, + {}, + plot=plt_d or plot, + metric=metric, + live=live_d or live, + **extra, + ) diff --git a/dvc/output/base.py b/dvc/output/base.py index cd8f5afd6c..114567611a 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -1,13 +1,15 @@ import logging import os from copy import copy -from typing import Type +from typing import Dict, Type from urllib.parse import urlparse +from funcy import cached_property, project from voluptuous import Any import dvc.prompt as prompt from dvc.cache import NamedCache +from dvc.env import DVCLIVE_PATH, DVCLIVE_SUMMARY from dvc.exceptions import ( CheckoutError, CollectCacheError, @@ -77,6 +79,8 @@ class BaseOutput: PARAM_PERSIST = "persist" PARAM_DESC = "desc" PARAM_ISEXEC = "isexec" + PARAM_LIVE = "live" + PARAM_LIVE_SUMMARY = "summary" METRIC_SCHEMA = Any( None, @@ -105,6 +109,7 @@ def __init__( plot=False, persist=False, checkpoint=False, + live=False, desc=None, isexec=False, ): @@ -132,6 +137,7 @@ def __init__( self.plot = False if self.IS_DEPENDENCY else plot self.persist = persist self.checkpoint = checkpoint + self.live = live self.desc = desc self.path_info = self._parse_path(tree, path) @@ -341,6 +347,9 @@ def dumpd(self): if self.isexec: ret[self.PARAM_ISEXEC] = self.isexec + if self.live: + ret[self.PARAM_LIVE] = self.live + return ret def verify_metric(self): @@ -591,3 +600,19 @@ def merge(self, ancestor, other): self.hash_info = self.cache.merge( ancestor_info, self.hash_info, other.hash_info ) + + @cached_property + def env(self) -> Dict[str, str]: + if self.live: + from dvc.schema import LIVE_PROPS + + env = {DVCLIVE_PATH: str(self.path_info)} + if isinstance(self.live, dict): + + config = project(self.live, LIVE_PROPS) + + env[DVCLIVE_SUMMARY] = str( + int(config.get(BaseOutput.PARAM_LIVE_SUMMARY, True)) + ) + return env + return {} diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index fcc76bff44..71aae71e39 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -133,6 +133,7 @@ def __init__( from dvc.cache import Cache from dvc.data_cloud import DataCloud from dvc.lock import LockNoop, make_lock + from dvc.repo.live import Live from dvc.repo.metrics import Metrics from dvc.repo.params import Params from dvc.repo.plots import Plots @@ -184,6 +185,7 @@ def __init__( self.plots = Plots(self) self.params = Params(self) self.stage_collection_error_handler = None + self.live = Live(self) self._lock_depth = 0 @cached_property diff --git a/dvc/repo/collect.py b/dvc/repo/collect.py index 32dd924738..e0f833651e 100644 --- a/dvc/repo/collect.py +++ b/dvc/repo/collect.py @@ -42,7 +42,7 @@ def _collect_paths( if recursive and tree.isdir(path_info): target_infos.update(set(tree.walk_files(path_info))) - if not tree.isfile(path_info): + if not tree.exists(path_info): if not recursive: logger.warning( "'%s' was not found at: '%s'.", path_info, rev, diff --git a/dvc/repo/live.py b/dvc/repo/live.py new file mode 100644 index 0000000000..126d15893e --- /dev/null +++ b/dvc/repo/live.py @@ -0,0 +1,40 @@ +import contextlib +import os +from typing import List + +from dvc.exceptions import MetricDoesNotExistError, MetricsError +from dvc.output import BaseOutput +from dvc.path_info import PathInfo +from dvc.repo import Repo + + +def summary_path_info(out: BaseOutput) -> PathInfo: + assert out.live + has_summary = True + if isinstance(out.live, dict): + has_summary = out.live.get(BaseOutput.PARAM_LIVE_SUMMARY, True) + if has_summary: + return out.path_info.with_suffix(".json") + return None + + +class Live: + def __init__(self, repo: Repo): + self.repo = repo + + def show(self, target: str, revs: List[str] = None): + if revs: + revs = ["workspace", *revs] + + if not os.path.exists(target): + raise MetricDoesNotExistError([target]) + + metrics_path = target + ".json" + + metrics = None + with contextlib.suppress(MetricsError): + metrics = self.repo.metrics.show(targets=[metrics_path]) + + plots = self.repo.plots.show(target, recursive=True, revs=revs) + + return metrics, plots diff --git a/dvc/repo/metrics/show.py b/dvc/repo/metrics/show.py index 2f5c8c9869..1a669be305 100644 --- a/dvc/repo/metrics/show.py +++ b/dvc/repo/metrics/show.py @@ -1,12 +1,16 @@ import logging +from typing import List from dvc.exceptions import ( MetricDoesNotExistError, NoMetricsFoundError, NoMetricsParsedError, ) +from dvc.output import BaseOutput +from dvc.path_info import PathInfo from dvc.repo import locked from dvc.repo.collect import collect +from dvc.repo.live import summary_path_info from dvc.scm.base import SCMError from dvc.tree.repo import RepoTree from dvc.utils.serialize import YAMLFileCorruptedError, load_yaml @@ -14,8 +18,20 @@ logger = logging.getLogger(__name__) -def _is_metric(out): - return bool(out.metric) +def _is_metric(out: BaseOutput) -> bool: + return bool(out.metric) or bool(out.live) + + +def _to_path_infos(metrics: List[BaseOutput]) -> List[PathInfo]: + result = [] + for out in metrics: + if out.metric: + result.append(out.path_info) + elif out.live: + path_info = summary_path_info(out) + if path_info: + result.append(path_info) + return result def _collect_metrics(repo, targets, revision, recursive): @@ -26,7 +42,7 @@ def _collect_metrics(repo, targets, revision, recursive): recursive=recursive, rev=revision, ) - return [m.path_info for m in metrics] + list(path_infos) + return _to_path_infos(metrics) + list(path_infos) def _extract_metrics(metrics, path, rev): @@ -59,7 +75,7 @@ def _read_metrics(repo, metrics, rev): res = {} for metric in metrics: - if not tree.exists(metric): + if not tree.isfile(metric): continue try: diff --git a/dvc/repo/plots/__init__.py b/dvc/repo/plots/__init__.py index fa030c177e..f0402b47b1 100644 --- a/dvc/repo/plots/__init__.py +++ b/dvc/repo/plots/__init__.py @@ -1,4 +1,5 @@ import logging +from typing import Dict, List from funcy import cached_property, first, project @@ -8,6 +9,8 @@ NoMetricsFoundError, NoMetricsParsedError, ) +from dvc.output import BaseOutput +from dvc.repo import Repo from dvc.repo.collect import collect from dvc.repo.plots.data import PlotParsingError from dvc.schema import PLOT_PROPS @@ -33,7 +36,12 @@ class Plots: def __init__(self, repo): self.repo = repo - def collect(self, targets=None, revs=None): + def collect( + self, + targets: List[str] = None, + revs: List[str] = None, + recursive: bool = False, + ) -> Dict[str, Dict]: """Collects all props and data for plots. Returns a structure like: @@ -52,20 +60,33 @@ def collect(self, targets=None, revs=None): rev = rev or "workspace" tree = RepoTree(self.repo) - plots = _collect_plots(self.repo, targets, rev) + plots = _collect_plots(self.repo, targets, rev, recursive) for path_info, props in plots.items(): - datafile = relpath(path_info, self.repo.root_dir) + if rev not in data: data[rev] = {} - data[rev].update({datafile: {"props": props}}) - # Load data from git or dvc cache - try: - with tree.open(path_info) as fd: - data[rev][datafile]["data"] = fd.read() - except FileNotFoundError: - # This might happen simply because cache is absent - pass + if tree.isdir(path_info): + plot_files = [] + for pi in tree.walk_files(path_info): + plot_files.append( + (pi, relpath(pi, self.repo.root_dir)) + ) + else: + plot_files = [ + (path_info, relpath(path_info, self.repo.root_dir)) + ] + + for path, repo_path in plot_files: + data[rev].update({repo_path: {"props": props}}) + + # Load data from git or dvc cache + try: + with tree.open(path) as fd: + data[rev][repo_path]["data"] = fd.read() + except FileNotFoundError: + # This might happen simply because cache is absent + pass return data @@ -78,7 +99,7 @@ def render(data, revs=None, props=None, templates=None): plots = _prepare_plots(data, revs, props) result = {} - for datafile, desc in plots.items(): + for datafile, desc in sorted(plots.items()): try: result[datafile] = _render( datafile, desc["data"], desc["props"], templates @@ -96,15 +117,27 @@ def render(data, revs=None, props=None, templates=None): return result - def show(self, targets=None, revs=None, props=None, templates=None): + def show( + self, + targets=None, + revs=None, + props=None, + templates=None, + recursive=False, + ): - data = self.collect(targets, revs) + data = self.collect(targets, revs, recursive) # If any mentioned plot doesn't have any data then that's an error targets = [targets] if isinstance(targets, str) else targets or [] for target in targets: rpath = relpath(target, self.repo.root_dir) - if not any("data" in d[rpath] for d in data.values()): + if not any( + "data" in rev_data[key] + for rev_data in data.values() + for key, d in rev_data.items() + if rpath in key + ): raise MetricDoesNotExistError([target]) # No data at all is a special error with a special message @@ -168,21 +201,30 @@ def templates(self): return PlotTemplates(self.repo.dvc_dir) -def _is_plot(out): - return bool(out.plot) +def _is_plot(out: BaseOutput) -> bool: + return bool(out.plot) or bool(out.live) -def _collect_plots(repo, targets=None, rev=None): +def _collect_plots( + repo: Repo, + targets: List[str] = None, + rev: str = None, + recursive: bool = False, +) -> Dict[str, Dict]: plots, path_infos = collect( - repo, output_filter=_is_plot, targets=targets, rev=rev + repo, + output_filter=_is_plot, + targets=targets, + rev=rev, + recursive=recursive, ) result = {plot.path_info: _plot_props(plot) for plot in plots} result.update({path_info: {} for path_info in path_infos}) return result -def _plot_props(out): - if not out.plot: +def _plot_props(out: BaseOutput) -> Dict: + if not (out.plot or out.live): raise NotAPlotError(out) if isinstance(out.plot, list): raise DvcException("Multiple plots per data file not supported.") diff --git a/dvc/repo/plots/data.py b/dvc/repo/plots/data.py index 4434b73660..afb8006dd2 100644 --- a/dvc/repo/plots/data.py +++ b/dvc/repo/plots/data.py @@ -125,15 +125,9 @@ def _find_data(data, fields=None, **kwargs): def _append_index(data_points, append_index=False, **kwargs): - if not append_index: + if not append_index or PlotData.INDEX_FIELD in first(data_points).keys(): return data_points - if PlotData.INDEX_FIELD in first(data_points).keys(): - raise DvcException( - "Cannot append index. Field of same name ('{}') found in data. " - "Use `-x` to specify x axis field.".format(PlotData.INDEX_FIELD) - ) - for index, data_point in enumerate(data_points): data_point[PlotData.INDEX_FIELD] = index return data_points @@ -147,7 +141,7 @@ def _append_revision(data_points, revision, **kwargs): class PlotData: REVISION_FIELD = "rev" - INDEX_FIELD = "index" + INDEX_FIELD = "step" def __init__(self, filename, revision, content, **kwargs): self.filename = filename diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index fd27d7c7c4..b36395588a 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -67,6 +67,12 @@ def _track_stage(stage): for out in stage.outs: if not out.use_scm_ignore and out.is_in_repo: stage.repo.scm.track_file(os.fspath(out.path_info)) + if out.live: + from dvc.repo.live import summary_path_info + + summary = summary_path_info(out) + if summary: + stage.repo.scm.track_file(os.fspath(summary)) stage.repo.scm.track_changed_files() diff --git a/dvc/repo/run.py b/dvc/repo/run.py index f4a06b8c9d..08e1c92fb7 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -1,7 +1,7 @@ import os from contextlib import suppress -from funcy import concat, first, lfilter +from funcy import concat, first, lfilter, without from dvc.exceptions import InvalidArgumentError from dvc.stage import PipelineStage @@ -49,6 +49,7 @@ def _get_file_path(kwargs): kwargs.get("outs_persist", []), kwargs.get("outs_persist_no_cache", []), kwargs.get("checkpoints", []), + without([kwargs.get("live", None)], None), ) ) diff --git a/dvc/schema.py b/dvc/schema.py index 740ae629ef..a7199ef4fa 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -59,6 +59,12 @@ } PLOT_PSTAGE_SCHEMA = {str: Any(PLOT_PROPS_SCHEMA, [PLOT_PROPS_SCHEMA])} +LIVE_PROPS = { + BaseOutput.PARAM_LIVE_SUMMARY: bool, +} +LIVE_PROPS_SCHEMA = {**PLOT_PROPS_SCHEMA, **LIVE_PROPS} +LIVE_PSTAGE_SCHEMA = {str: LIVE_PROPS_SCHEMA} + PARAM_PSTAGE_NON_DEFAULT_SCHEMA = {str: [str]} VARS_SCHEMA = [str, dict] @@ -78,6 +84,7 @@ Any(str, OUT_PSTAGE_DETAILED_SCHEMA) ], Optional(StageParams.PARAM_PLOTS): [Any(str, PLOT_PSTAGE_SCHEMA)], + Optional(StageParams.PARAM_LIVE): Any(str, LIVE_PSTAGE_SCHEMA), } FOREACH_IN = { diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 505aa0e790..07b2879af6 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -3,7 +3,7 @@ import string from collections import defaultdict from dataclasses import dataclass -from typing import Optional +from typing import Dict, Optional from funcy import cached_property, project @@ -251,6 +251,15 @@ def is_checkpoint(self): """ return any(out.checkpoint for out in self.outs) + @property + def env(self) -> Dict[str, str]: + env = {} + for out in self.outs: + if any(out.env.keys() and env.keys()): + raise DvcException("Duplicated env variable") + env.update(out.env) + return env + def changed_deps(self): if self.frozen: return False diff --git a/dvc/stage/loader.py b/dvc/stage/loader.py index 05e1d772f9..4dd0a92741 100644 --- a/dvc/stage/loader.py +++ b/dvc/stage/loader.py @@ -82,7 +82,12 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data=None): outs = project( stage_data, - [stage.PARAM_OUTS, stage.PARAM_METRICS, stage.PARAM_PLOTS], + [ + stage.PARAM_OUTS, + stage.PARAM_METRICS, + stage.PARAM_PLOTS, + stage.PARAM_LIVE, + ], ) stage.outs = lcat( output.load_from_pipeline(stage, data, typ=key) diff --git a/dvc/stage/params.py b/dvc/stage/params.py index 14ca18b250..c43a75b183 100644 --- a/dvc/stage/params.py +++ b/dvc/stage/params.py @@ -12,3 +12,4 @@ class StageParams: PARAM_METRICS = "metrics" PARAM_PLOTS = "plots" PARAM_DESC = "desc" + PARAM_LIVE = "live" diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 1182cd1341..45cb6caa8b 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -56,6 +56,8 @@ def prepare_kwargs(stage, checkpoint_func=None): # indicate that checkpoint cmd is being run inside DVC kwargs["env"].update(_checkpoint_env(stage)) + kwargs["env"].update(stage.env) + # NOTE: when you specify `shell=True`, `Popen` [1] will default to # `/bin/sh` on *nix and will add ["/bin/sh", "-c"] to your command. # But we actually want to run the same shell that we are running diff --git a/dvc/stage/serialize.py b/dvc/stage/serialize.py index 4c1d76e371..a165f3c690 100644 --- a/dvc/stage/serialize.py +++ b/dvc/stage/serialize.py @@ -50,6 +50,8 @@ def _get_flags(out): # `out.plot` is in the same order as is in the file when read # and, should be dumped as-is without any sorting yield from out.plot.items() + if out.live and isinstance(out.live, dict): + yield from out.live.items() def _serialize_out(out): @@ -59,15 +61,19 @@ def _serialize_out(out): @no_type_check def _serialize_outs(outputs: List[BaseOutput]): - outs, metrics, plots = [], [], [] + outs, metrics, plots, live = [], [], [], None for out in sort_by_path(outputs): bucket = outs if out.plot: bucket = plots elif out.metric: bucket = metrics + elif out.live: + assert live is None + live = _serialize_out(out) + continue bucket.append(_serialize_out(out)) - return outs, metrics, plots + return outs, metrics, plots, live def _serialize_params_keys(params): @@ -122,7 +128,7 @@ def to_pipeline_file(stage: "PipelineStage"): deps = sorted(d.def_path for d in deps) params = _serialize_params_keys(params) - outs, metrics, plots = _serialize_outs(stage.outs) + outs, metrics, plots, live = _serialize_outs(stage.outs) res = [ (stage.PARAM_DESC, stage.desc), (stage.PARAM_CMD, stage.cmd), @@ -132,6 +138,7 @@ def to_pipeline_file(stage: "PipelineStage"): (stage.PARAM_OUTS, outs), (stage.PARAM_METRICS, metrics), (stage.PARAM_PLOTS, plots), + (stage.PARAM_LIVE, live), (stage.PARAM_FROZEN, stage.frozen), (stage.PARAM_ALWAYS_CHANGED, stage.always_changed), (stage.PARAM_META, stage.meta), diff --git a/dvc/stage/utils.py b/dvc/stage/utils.py index df801802eb..8974537f9a 100644 --- a/dvc/stage/utils.py +++ b/dvc/stage/utils.py @@ -57,6 +57,11 @@ def fill_stage_outputs(stage, **kwargs): ] stage.outs = [] + + stage.outs += _load_live_outputs( + stage, kwargs.get("live", None), kwargs.get("live_summary", False) + ) + for key in keys: stage.outs += output.loads_from( stage, @@ -69,6 +74,22 @@ def fill_stage_outputs(stage, **kwargs): ) +def _load_live_outputs(stage, live_l=None, live_summary=False): + from dvc.output import BaseOutput + + outs = [] + if live_l: + + outs += output.loads_from( + stage, + [live_l], + use_cache=False, + live={BaseOutput.PARAM_LIVE_SUMMARY: live_summary}, + ) + + return outs + + def fill_stage_dependencies(stage, deps=None, erepo=None, params=None): assert not stage.deps stage.deps = [] diff --git a/dvc/utils/html.py b/dvc/utils/html.py new file mode 100644 index 0000000000..7bf4ae27ba --- /dev/null +++ b/dvc/utils/html.py @@ -0,0 +1,71 @@ +from typing import Dict, List, Optional + +import tabulate + +PAGE_HTML = """ + + + DVC Plot + + + + + + {divs} + +""" + +VEGA_DIV_HTML = """
+""" + + +class HTML: + def __init__(self): + self.elements = [] + + def with_metrics(self, metrics: Dict[str, Dict]) -> "HTML": + header: List[str] = [] + rows: List[List[str]] = [] + + for _, rev_data in metrics.items(): + for _, data in rev_data.items(): + if not header: + header.extend(sorted(data.keys())) + + rows.append([data[key] for key in header]) + + self.elements.append(tabulate.tabulate(rows, header, tablefmt="html")) + return self + + def with_plots(self, plots: Dict[str, Dict]) -> "HTML": + self.elements.extend( + [ + VEGA_DIV_HTML.format(id=f"plot{i}", vega_json=plot) + for i, plot in enumerate(plots.values()) + ] + ) + return self + + def with_element(self, html: str) -> "HTML": + self.elements.append(html) + return self + + def embed(self) -> str: + return PAGE_HTML.format(divs="\n".join(self.elements)) + + +def write( + path, plots: Dict[str, Dict], metrics: Optional[Dict[str, Dict]] = None +): + document = HTML() + if metrics: + document.with_metrics(metrics) + document.with_element("
") + + document.with_plots(plots) + + with open(path, "w") as fd: + fd.write(document.embed()) diff --git a/tests/func/api/__init__.py b/tests/func/api/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/func/test_api.py b/tests/func/api/test_api.py similarity index 100% rename from tests/func/test_api.py rename to tests/func/api/test_api.py diff --git a/tests/func/api/test_live.py b/tests/func/api/test_live.py new file mode 100644 index 0000000000..3edc69e36f --- /dev/null +++ b/tests/func/api/test_live.py @@ -0,0 +1,55 @@ +import io +import json +import logging +from typing import Dict, List + +import pytest +from funcy import last + +from dvc.api.live import summary +from tests.utils import dump_sv + + +def _dumps_tsv(metrics: List[Dict]): + stream = io.StringIO() + dump_sv(stream, metrics) + stream.seek(0) + return stream.read() + + +@pytest.fixture +def live_results(tmp_dir): + def make(path="logs"): + datapoints = [{"metric": 0.0, "step": 0}, {"metric": 0.5, "step": 1}] + tmp_dir.gen( + { + (tmp_dir / path).with_suffix(".json"): json.dumps( + last(datapoints) + ), + (tmp_dir / path / "metric.tsv"): _dumps_tsv(datapoints), + } + ) + + yield make + + +def test_live_summary_no_repo(tmp_dir, live_results, caplog): + live_results("logs") + + with caplog.at_level(logging.INFO, logger="dvc"): + summary("logs") + + summary_path = tmp_dir / "logs.html" + assert summary_path.exists() + assert f"file://{str(summary_path)}" in caplog.text + + +def test_live_summary(tmp_dir, dvc, live_results, caplog): + live_results("logs") + + with caplog.at_level(logging.INFO, logger="dvc"): + summary("logs") + + summary_path = tmp_dir / "logs.html" + assert summary_path.exists() + assert f"file://{str(summary_path)}" in caplog.text diff --git a/tests/func/plots/test_modify.py b/tests/func/plots/test_modify.py index 3cc2fc9513..06a434ac01 100644 --- a/tests/func/plots/test_modify.py +++ b/tests/func/plots/test_modify.py @@ -1,3 +1,6 @@ +import json +import os + import pytest from dvc.dvcfile import PIPELINE_LOCK @@ -67,3 +70,37 @@ def test_unset_nonexistent(tmp_dir, dvc, run_copy_metrics, custom_template): dvc.plots.modify( "metric.json", unset=["nonexistent"], ) + + +def test_dir_plots(tmp_dir, dvc, run_copy_metrics): + subdir = tmp_dir / "subdir" + subdir.mkdir() + + metric = [ + {"first_val": 100, "val": 2}, + {"first_val": 200, "val": 3}, + ] + + fname = "file.json" + _write_json(tmp_dir, metric, fname) + + p1 = os.path.join("subdir", "p1.json") + p2 = os.path.join("subdir", "p2.json") + tmp_dir.dvc.run( + cmd=( + f"mkdir subdir && python copy.py {fname} {p1} && " + f"python copy.py {fname} {p2}" + ), + deps=[fname], + single_stage=False, + plots=["subdir"], + name="copy_double", + ) + dvc.plots.modify("subdir", {"title": "TITLE"}) + + result = dvc.plots.show() + p1_content = json.loads(result[p1]) + p2_content = json.loads(result[p2]) + + assert p1_content["title"] == p2_content["title"] == "TITLE" + assert p1_content == p2_content diff --git a/tests/func/plots/test_show.py b/tests/func/plots/test_show.py index 1d5cc40d67..876ab55123 100644 --- a/tests/func/plots/test_show.py +++ b/tests/func/plots/test_show.py @@ -413,7 +413,7 @@ def test_throw_on_no_metric_at_all(tmp_dir, scm, dvc, caplog): # do not warn if none found assert len(caplog.messages) == 0 - assert str(error.value) == "File: 'plot.json' does not exist." + assert str(error.value) == "'plot.json' does not exist." def test_custom_template(tmp_dir, scm, dvc, custom_template, run_copy_metrics): @@ -760,3 +760,70 @@ def test_plots_show_overlap(tmp_dir, dvc, run_copy_metrics, clear_before_run): with pytest.raises(OverlappingOutputPathsError): dvc.plots.show() + + +def test_dir_plots(tmp_dir, dvc, run_copy_metrics): + subdir = tmp_dir / "subdir" + subdir.mkdir() + + metric = [ + {"first_val": 100, "val": 2}, + {"first_val": 200, "val": 3}, + ] + + fname = "file.json" + _write_json(tmp_dir, metric, fname) + + p1 = os.path.join("subdir", "p1.json") + p2 = os.path.join("subdir", "p2.json") + tmp_dir.dvc.run( + cmd=( + f"mkdir subdir && python copy.py {fname} {p1} && " + f"python copy.py {fname} {p2}" + ), + deps=[fname], + single_stage=False, + plots=["subdir"], + name="copy_double", + ) + dvc.plots.modify("subdir", {"title": "TITLE"}) + + result = dvc.plots.show() + p1_content = json.loads(result[p1]) + p2_content = json.loads(result[p2]) + + assert p1_content["title"] == p2_content["title"] == "TITLE" + + +def test_show_dir_plots(tmp_dir, dvc, run_copy_metrics): + subdir = tmp_dir / "subdir" + subdir.mkdir() + metric = [ + {"first_val": 100, "val": 2}, + {"first_val": 200, "val": 3}, + ] + + fname = "file.json" + _write_json(tmp_dir, metric, fname) + + p1 = os.path.join("subdir", "p1.json") + p2 = os.path.join("subdir", "p2.json") + tmp_dir.dvc.run( + cmd=( + f"mkdir subdir && python copy.py {fname} {p1} && " + f"python copy.py {fname} {p2}" + ), + deps=[fname], + single_stage=False, + plots=["subdir"], + name="copy_double", + ) + + result = dvc.plots.show(targets=["subdir"]) + p1_content = json.loads(result[p1]) + p2_content = json.loads(result[p2]) + + assert p1_content == p2_content + + result = dvc.plots.show(targets=[p1]) + assert set(result.keys()) == {p1} diff --git a/tests/func/plots/utils.py b/tests/func/plots/utils.py index 77f4697df2..3ebb039110 100644 --- a/tests/func/plots/utils.py +++ b/tests/func/plots/utils.py @@ -1,22 +1,11 @@ -import csv import json -from funcy import first +from tests.utils import dump_sv def _write_csv(metric, filename, header=True): with open(filename, "w", newline="") as csvobj: - if header: - writer = csv.DictWriter( - csvobj, fieldnames=list(first(metric).keys()) - ) - writer.writeheader() - writer.writerows(metric) - else: - writer = csv.writer(csvobj) - for d in metric: - assert len(d) == 1 - writer.writerow(list(d.values())) + dump_sv(csvobj, metric, delimiter=",", header=header) def _write_json(tmp_dir, metric, filename): diff --git a/tests/func/test_live.py b/tests/func/test_live.py new file mode 100644 index 0000000000..54790d6218 --- /dev/null +++ b/tests/func/test_live.py @@ -0,0 +1,117 @@ +from textwrap import dedent + +import pytest + +from dvc import stage as stage_module +from dvc.exceptions import MetricsError + +LIVE_SCRITP = dedent( + """ + from dvclive import dvclive + import sys + r = 2 + for i in range(r): + dvclive.log("loss", 1-i/r) + dvclive.log("accuracy", i/r) + dvclive.next_step()""" +) + + +@pytest.fixture +def live_stage(tmp_dir, scm, dvc): + + pytest.skip("dvclive does not exist yet") + + def make(summary=True): + tmp_dir.gen("train.py", LIVE_SCRITP) + tmp_dir.gen("params.yaml", "foo: 1") + stage = dvc.run( + cmd="python train.py", + params=["foo"], + deps=["train.py"], + name="live_stage", + live="logs", + live_summary=summary, + ) + + scm.add(["dvc.yaml", "dvc.lock", "train.py", "params.yaml"]) + scm.commit("initial: live_stage") + return stage + + yield make + + +@pytest.mark.parametrize("summary", (True, False)) +def test_export_config_tmp(tmp_dir, dvc, mocker, summary): + run_spy = mocker.spy(stage_module.run, "_run") + tmp_dir.gen("src", "dependency") + dvc.run( + cmd="mkdir logs && touch logs.json", + deps=["src"], + name="run_logger", + live="logs", + live_summary=summary, + ) + + assert run_spy.call_count == 1 + _, kwargs = run_spy.call_args + + assert "DVCLIVE_PATH" in kwargs["env"] + assert kwargs["env"]["DVCLIVE_PATH"] == "logs" + + assert "DVCLIVE_SUMMARY" in kwargs["env"] + assert kwargs["env"]["DVCLIVE_SUMMARY"] == str(int(summary)) + + +@pytest.mark.parametrize("summary", (True, False)) +def test_export_config(tmp_dir, dvc, mocker, summary, live_stage): + run_spy = mocker.spy(stage_module.run, "_run") + live_stage(summary=summary) + + assert run_spy.call_count == 1 + _, kwargs = run_spy.call_args + + assert "DVCLIVE_PATH" in kwargs["env"] + assert kwargs["env"]["DVCLIVE_PATH"] == "logs" + + assert "DVCLIVE_SUMMARY" in kwargs["env"] + assert kwargs["env"]["DVCLIVE_SUMMARY"] == str(int(summary)) + + +def test_live_provides_metrics(tmp_dir, dvc, live_stage): + live_stage(summary=True) + + assert (tmp_dir / "logs.json").is_file() + assert dvc.metrics.show() == { + "": {"logs.json": {"step": 1, "loss": 0.5, "accuracy": 0.5}} + } + + assert (tmp_dir / "logs").is_dir() + plots = dvc.plots.show() + assert "logs/accuracy.tsv" in plots + assert "logs/loss.tsv" in plots + + +def test_live_provides_no_metrics(tmp_dir, dvc, live_stage): + live_stage(summary=False) + + assert not (tmp_dir / "logs.json").is_file() + with pytest.raises(MetricsError): + assert dvc.metrics.show() == {} + + assert (tmp_dir / "logs").is_dir() + plots = dvc.plots.show() + assert "logs/accuracy.tsv" in plots + assert "logs/loss.tsv" in plots + + +def test_experiments_track_summary(tmp_dir, scm, dvc, live_stage): + live_stage(summary=True) + baseline_rev = scm.get_rev() + + experiments = dvc.experiments.run(targets=["live_stage"], params=["foo=2"]) + assert len(experiments) == 1 + ((exp_rev, _),) = experiments.items() + + res = dvc.experiments.show() + assert "logs.json" in res[baseline_rev][exp_rev]["metrics"].keys() diff --git a/tests/unit/command/test_live.py b/tests/unit/command/test_live.py new file mode 100644 index 0000000000..071d1f5b37 --- /dev/null +++ b/tests/unit/command/test_live.py @@ -0,0 +1,42 @@ +from dvc.cli import parse_args +from dvc.command.live import CmdLiveDiff, CmdLiveShow + + +def test_live_diff(dvc, mocker): + cli_args = parse_args( + [ + "live", + "diff", + "--out", + "result.extension", + "target", + "--revs", + "HEAD", + "rev1", + ] + ) + assert cli_args.func == CmdLiveDiff + + cmd = cli_args.func(cli_args) + m = mocker.patch("dvc.repo.live.Live.show", return_value=({}, {})) + + assert cmd.run() == 0 + + m.assert_called_once_with( + target="target", revs=["HEAD", "rev1"], + ) + + +def test_live_show(dvc, mocker): + cli_args = parse_args( + ["live", "show", "-o", "result.extension", "datafile"] + ) + assert cli_args.func == CmdLiveShow + + cmd = cli_args.func(cli_args) + + m = mocker.patch("dvc.repo.live.Live.show", return_value=({}, {})) + + assert cmd.run() == 0 + + m.assert_called_once_with(target="datafile", revs=None) diff --git a/tests/unit/command/test_run.py b/tests/unit/command/test_run.py index 6a2939099d..8ab68d4a6c 100644 --- a/tests/unit/command/test_run.py +++ b/tests/unit/command/test_run.py @@ -22,6 +22,9 @@ def test_run(mocker, dvc): "plots", "--plots-no-cache", "plots-no-cache", + "--live", + "live", + "--live-no-summary", "--file", "file", "--wdir", @@ -66,6 +69,8 @@ def test_run(mocker, dvc): outs_persist_no_cache=["outs-persist-no-cache"], checkpoints=["checkpoints"], params=["file:param1,param2", "param3"], + live="live", + live_summary=False, fname="file", wdir="wdir", no_exec=True, @@ -94,6 +99,8 @@ def test_run_args_from_cli(mocker, dvc): metrics_no_cache=[], plots=[], plots_no_cache=[], + live=None, + live_summary=True, outs_persist=[], outs_persist_no_cache=[], checkpoints=[], @@ -126,6 +133,8 @@ def test_run_args_with_spaces(mocker, dvc): metrics_no_cache=[], plots=[], plots_no_cache=[], + live=None, + live_summary=True, outs_persist=[], outs_persist_no_cache=[], checkpoints=[], diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index 91f63c6fb4..8656007856 100644 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -1,6 +1,9 @@ +import csv import os from contextlib import contextmanager +from funcy import first + from dvc.scm import Git @@ -21,3 +24,16 @@ def cd(newdir): def to_posixpath(path): return path.replace("\\", "/") + + +def dump_sv(stream, metrics, delimiter=",", header=True): + if header: + writer = csv.DictWriter( + stream, fieldnames=list(first(metrics).keys()), delimiter=delimiter + ) + writer.writeheader() + writer.writerows(metrics) + else: + writer = csv.writer(stream) + for d in metrics: + writer.writerow(list(d.values()))