From e09c42f58038ea8431cf5f18fa738847d4569e47 Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Sat, 10 Dec 2022 16:29:52 -0800 Subject: [PATCH] perf: remove fs exists check in plots, parallel data collect --- dvc/repo/collect.py | 11 +---------- dvc/repo/experiments/show.py | 2 +- dvc/repo/metrics/show.py | 10 +++------- dvc/repo/params/show.py | 10 +++------- dvc/repo/plots/__init__.py | 33 +++++++++++++++++++++++---------- tests/func/metrics/test_show.py | 10 ++-------- tests/func/plots/test_show.py | 8 ++++++-- tests/unit/test_collect.py | 9 --------- 8 files changed, 39 insertions(+), 54 deletions(-) diff --git a/dvc/repo/collect.py b/dvc/repo/collect.py index 9974852e20..b79bb73bbb 100644 --- a/dvc/repo/collect.py +++ b/dvc/repo/collect.py @@ -28,7 +28,6 @@ def _collect_paths( repo: "Repo", targets: Iterable[str], recursive: bool = False, - rev: str = None, ) -> StrPaths: from dvc.fs.dvc import DVCFileSystem @@ -39,13 +38,6 @@ def _collect_paths( for fs_path in fs_paths: if recursive and fs.isdir(fs_path): target_paths.extend(fs.find(fs_path)) - - rel = fs.path.relpath(fs_path) - if not fs.exists(fs_path): - if rev == "workspace" or rev == "": - logger.warning("'%s' was not found in current workspace.", rel) - else: - logger.warning("'%s' was not found at: '%s'.", rel, rev) target_paths.append(fs_path) return target_paths @@ -73,7 +65,6 @@ def collect( deps: bool = False, targets: Iterable[str] = None, output_filter: FilterFn = None, - rev: str = None, recursive: bool = False, duplicates: bool = False, ) -> Tuple[Outputs, StrPaths]: @@ -85,6 +76,6 @@ def collect( fs_paths: StrPaths = [] return outs, fs_paths - target_paths = _collect_paths(repo, targets, recursive=recursive, rev=rev) + target_paths = _collect_paths(repo, targets, recursive=recursive) return _filter_outs(outs, target_paths, duplicates=duplicates) diff --git a/dvc/repo/experiments/show.py b/dvc/repo/experiments/show.py index 4de0468032..c2251528c0 100644 --- a/dvc/repo/experiments/show.py +++ b/dvc/repo/experiments/show.py @@ -78,7 +78,7 @@ def collect_experiment_commit( result["timestamp"] = datetime.fromtimestamp(commit.commit_time) params = _gather_params( - repo, rev=rev, targets=None, deps=param_deps, onerror=onerror + repo, targets=None, deps=param_deps, onerror=onerror ) if params: result["params"] = params diff --git a/dvc/repo/metrics/show.py b/dvc/repo/metrics/show.py index d482b2db01..a7b4a1158e 100644 --- a/dvc/repo/metrics/show.py +++ b/dvc/repo/metrics/show.py @@ -44,13 +44,9 @@ def _collect_top_level_metrics(repo): yield repo.fs.path.normpath(path) -def _collect_metrics(repo, targets, revision, recursive): +def _collect_metrics(repo, targets, recursive): metrics, fs_paths = collect( - repo, - targets=targets, - output_filter=_is_metric, - recursive=recursive, - rev=revision, + repo, targets=targets, output_filter=_is_metric, recursive=recursive ) return _to_fs_paths(metrics) + list(fs_paths) @@ -109,7 +105,7 @@ def _read_metrics(repo, metrics, rev, onerror=None): def _gather_metrics(repo, targets, rev, recursive, onerror=None): - metrics = _collect_metrics(repo, targets, rev, recursive) + metrics = _collect_metrics(repo, targets, recursive) metrics.extend(_collect_top_level_metrics(repo)) return _read_metrics(repo, metrics, rev, onerror=onerror) diff --git a/dvc/repo/params/show.py b/dvc/repo/params/show.py index c432f89d84..58d13e31ee 100644 --- a/dvc/repo/params/show.py +++ b/dvc/repo/params/show.py @@ -52,7 +52,7 @@ def _collect_top_level_params(repo): def _collect_configs( - repo: "Repo", rev, targets=None, deps=False, stages=None + repo: "Repo", targets=None, deps=False, stages=None ) -> Tuple[List["Output"], List[str]]: params, fs_paths = collect( @@ -60,7 +60,6 @@ def _collect_configs( targets=targets or [], deps=True, output_filter=_is_params, - rev=rev, duplicates=deps or stages is not None, ) all_fs_paths = fs_paths + [p.fs_path for p in params] @@ -156,7 +155,6 @@ def show( for branch in repo.brancher(revs=revs): params = error_handler(_gather_params)( repo=repo, - rev=branch, targets=targets, deps=deps, onerror=onerror, @@ -188,11 +186,9 @@ def show( return res -def _gather_params( - repo, rev, targets=None, deps=False, onerror=None, stages=None -): +def _gather_params(repo, targets=None, deps=False, onerror=None, stages=None): param_outs, params_fs_paths = _collect_configs( - repo, rev, targets=targets, deps=deps, stages=stages + repo, targets=targets, deps=deps, stages=stages ) params_fs_paths.extend(_collect_top_level_params(repo=repo)) params = _read_params( diff --git a/dvc/repo/plots/__init__.py b/dvc/repo/plots/__init__.py index 74f12a4f4a..0e81b02583 100644 --- a/dvc/repo/plots/__init__.py +++ b/dvc/repo/plots/__init__.py @@ -5,6 +5,7 @@ from collections import OrderedDict, defaultdict from copy import deepcopy from functools import partial +from multiprocessing import cpu_count from typing import ( TYPE_CHECKING, Any, @@ -24,6 +25,7 @@ from dvc.exceptions import DvcException from dvc.utils import error_handler, errored_revisions, onerror_collect from dvc.utils.serialize import LOADERS +from dvc.utils.threadpool import ThreadPoolExecutor if TYPE_CHECKING: from dvc.output import Output @@ -136,7 +138,6 @@ def collect( data_targets = _get_data_targets(definitions) res[rev]["sources"] = self._collect_data_sources( - revision=rev, targets=data_targets, recursive=recursive, props=props, @@ -148,7 +149,6 @@ def collect( def _collect_data_sources( self, targets: Optional[List[str]] = None, - revision: Optional[str] = None, recursive: bool = False, props: Optional[Dict] = None, onerror: Optional[Callable] = None, @@ -159,7 +159,7 @@ def _collect_data_sources( props = props or {} - plots = _collect_plots(self.repo, targets, revision, recursive) + plots = _collect_plots(self.repo, targets, recursive) res: Dict[str, Any] = {} for fs_path, rev_props in plots.items(): joined_props = {**rev_props, **props} @@ -270,19 +270,33 @@ def _is_plot(out: "Output") -> bool: def _resolve_data_sources(plots_data: Dict): - for value in plots_data.values(): + values = list(plots_data.values()) + to_resolve = [] + while values: + value = values.pop() if isinstance(value, dict): if "data_source" in value: - data_source = value.pop("data_source") - assert callable(data_source) - value.update(data_source()) - _resolve_data_sources(value) + to_resolve.append(value) + values.extend(value.values()) + + def resolve(value): + data_source = value.pop("data_source") + assert callable(data_source) + value.update(data_source()) + + executor = ThreadPoolExecutor( + max_workers=4 * cpu_count(), + thread_name_prefix="resolve_data", + cancel_on_error=True, + ) + with executor: + # imap_unordered is lazy, wrapping to trigger it + list(executor.imap_unordered(resolve, to_resolve)) def _collect_plots( repo: "Repo", targets: List[str] = None, - rev: str = None, recursive: bool = False, ) -> Dict[str, Dict]: from dvc.repo.collect import collect @@ -291,7 +305,6 @@ def _collect_plots( repo, output_filter=_is_plot, targets=targets, - rev=rev, recursive=recursive, ) diff --git a/tests/func/metrics/test_show.py b/tests/func/metrics/test_show.py index e45a38e73b..1b4c207d3c 100644 --- a/tests/func/metrics/test_show.py +++ b/tests/func/metrics/test_show.py @@ -1,4 +1,3 @@ -import logging import os import pytest @@ -258,13 +257,8 @@ def test_show_malformed_metric(tmp_dir, scm, dvc, caplog): ) -def test_metrics_show_no_target(tmp_dir, dvc, caplog): - with caplog.at_level(logging.WARNING): - assert dvc.metrics.show(targets=["metrics.json"]) == {"": {}} - - assert ( - "'metrics.json' was not found in current workspace." in caplog.messages - ) +def test_metrics_show_no_target(tmp_dir, dvc, capsys): + assert dvc.metrics.show(targets=["metrics.json"]) == {"": {}} def test_show_no_metrics_files(tmp_dir, dvc, caplog): diff --git a/tests/func/plots/test_show.py b/tests/func/plots/test_show.py index 5cb55841c0..def95b8741 100644 --- a/tests/func/plots/test_show.py +++ b/tests/func/plots/test_show.py @@ -128,14 +128,18 @@ def test_show_from_subdir(tmp_dir, dvc, capsys): assert (subdir / "dvc_plots" / "index.html").is_file() -def test_plots_show_non_existing(tmp_dir, dvc, caplog): +def test_plots_show_non_existing(tmp_dir, dvc, capsys): result = dvc.plots.show(targets=["plot.json"]) assert isinstance( get_plot(result, "workspace", file="plot.json", endkey="error"), FileNotFoundError, ) - assert "'plot.json' was not found in current workspace." in caplog.text + cap = capsys.readouterr() + assert ( + "DVC failed to load some plots for following revisions: 'workspace'" + in cap.err + ) @pytest.mark.parametrize("clear_before_run", [True, False]) diff --git a/tests/unit/test_collect.py b/tests/unit/test_collect.py index 0e64c42126..456a4502a1 100644 --- a/tests/unit/test_collect.py +++ b/tests/unit/test_collect.py @@ -1,15 +1,6 @@ -import logging - from dvc.repo.collect import collect -def test_no_file_on_target_rev(tmp_dir, scm, dvc, caplog): - with caplog.at_level(logging.WARNING, "dvc"): - collect(dvc, targets=["file.yaml"], rev="current_branch") - - assert "'file.yaml' was not found at: 'current_branch'." in caplog.text - - def test_collect_duplicates(tmp_dir, scm, dvc): tmp_dir.gen("params.yaml", "foo: 1\nbar: 2") tmp_dir.gen("foobar", "")