diff --git a/dvc/cli.py b/dvc/cli.py index be0c0f6e7a..5ffdc1347f 100644 --- a/dvc/cli.py +++ b/dvc/cli.py @@ -10,6 +10,7 @@ commit, config, daemon, + dag, data_sync, destroy, diff, @@ -26,7 +27,6 @@ metrics, move, params, - pipeline, plots, remote, remove, @@ -67,7 +67,7 @@ root, ls, freeze, - pipeline, + dag, daemon, commit, diff, diff --git a/dvc/command/dag.py b/dvc/command/dag.py new file mode 100644 index 0000000000..104af837a3 --- /dev/null +++ b/dvc/command/dag.py @@ -0,0 +1,115 @@ +import argparse +import logging + +from dvc.command.base import CmdBase, append_doc_link +from dvc.exceptions import DvcException + +logger = logging.getLogger(__name__) + + +def _show_ascii(G): + from dvc.repo.graph import get_pipelines + from dvc.dagascii import draw + + pipelines = get_pipelines(G) + + ret = [] + for pipeline in pipelines: + ret.append(draw(pipeline.nodes, pipeline.edges)) + + return "\n".join(ret) + + +def _show_dot(G): + import io + from networkx.drawing.nx_pydot import write_dot + + dot_file = io.StringIO() + write_dot(G, dot_file) + return dot_file.getvalue() + + +def _build(G, target=None, full=False): + import networkx as nx + from dvc.repo.graph import get_pipeline, get_pipelines + + if target: + H = get_pipeline(get_pipelines(G), target) + if not full: + descendants = nx.descendants(G, target) + descendants.add(target) + H.remove_nodes_from(set(G.nodes()) - descendants) + else: + H = G + + def _relabel(stage): + return stage.addressing + + return nx.relabel_nodes(H, _relabel, copy=False) + + +class CmdDAG(CmdBase): + def run(self): + try: + target = None + if self.args.target: + stages = self.repo.collect(self.args.target) + if len(stages) > 1: + logger.error( + f"'{self.args.target}' contains more than one stage " + "{stages}, please specify one stage" + ) + return 1 + target = stages[0] + + G = _build(self.repo.graph, target=target, full=self.args.full,) + + if self.args.dot: + logger.info(_show_dot(G)) + else: + from dvc.utils.pager import pager + + pager(_show_ascii(G)) + + return 0 + except DvcException: + msg = "failed to show" + if self.args.target: + msg += f"a pipeline for '{target}'" + else: + msg += "pipelines" + logger.exception(msg) + return 1 + + +def add_parser(subparsers, parent_parser): + DAG_HELP = "Visualize DVC project DAG." + dag_parser = subparsers.add_parser( + "dag", + parents=[parent_parser], + description=append_doc_link(DAG_HELP, "dag"), + help=DAG_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + dag_parser.add_argument( + "--dot", + action="store_true", + default=False, + help="Print DAG with .dot format.", + ) + dag_parser.add_argument( + "--full", + action="store_true", + default=False, + help=( + "Show full DAG that the target belongs too, instead of " + "showing DAG consisting only of ancestors." + ), + ) + dag_parser.add_argument( + "target", + nargs="?", + help="Stage or output to show pipeline for. Optional. " + "(Finds all stages in the workspace by default.)", + ) + dag_parser.set_defaults(func=CmdDAG) diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py deleted file mode 100644 index eb7141a887..0000000000 --- a/dvc/command/pipeline.py +++ /dev/null @@ -1,265 +0,0 @@ -import argparse -import logging - -from dvc.command.base import CmdBase, append_doc_link, fix_subparsers -from dvc.exceptions import DvcException - -logger = logging.getLogger(__name__) - - -class CmdPipelineShow(CmdBase): - def _show(self, target, commands, outs, locked): - import networkx - from dvc.utils import parse_target - - path, name = parse_target(target) - stage = self.repo.get_stage(path, name) - G = self.repo.graph - stages = networkx.dfs_postorder_nodes(G, stage) - if locked: - stages = [s for s in stages if s.frozen] - - for stage in stages: - if commands: - if stage.cmd is None: - continue - logger.info(stage.cmd) - elif outs: - for out in stage.outs: - logger.info(str(out)) - else: - logger.info(stage.addressing) - - @staticmethod - def _build_output_graph(G, target_stage): - import networkx - from itertools import product - - nodes = {str(out) for out in target_stage.outs} - edges = [] - - for from_stage, to_stage in networkx.edge_dfs(G, target_stage): - from_stage_deps = {dep.path_info.parts for dep in from_stage.deps} - to_outs = { - to_out - for to_out in to_stage.outs - if to_out.path_info.parts in from_stage_deps - } - from_outs = { - from_out - for from_out in from_stage.outs - if str(from_out) in nodes - } - nodes |= {str(to_out) for to_out in to_outs} - for from_out, to_out in product(from_outs, to_outs): - edges.append((str(from_out), str(to_out))) - return nodes, edges - - def _build_graph(self, target, commands=False, outs=False): - import networkx - from dvc.repo.graph import get_pipeline - from dvc.utils import parse_target - - path, name = parse_target(target) - target_stage = self.repo.get_stage(path, name) - G = get_pipeline(self.repo.pipelines, target_stage) - - nodes = set() - for stage in networkx.dfs_preorder_nodes(G, target_stage): - if commands: - if stage.cmd is None: - continue - nodes.add(stage.cmd) - elif not outs: - nodes.add(stage.addressing) - - edges = [] - for from_stage, to_stage in networkx.edge_dfs(G, target_stage): - if commands: - if to_stage.cmd is None: - continue - edges.append((from_stage.cmd, to_stage.cmd)) - elif not outs: - edges.append((from_stage.addressing, to_stage.addressing)) - - if outs: - nodes, edges = self._build_output_graph(G, target_stage) - - return list(nodes), edges, networkx.is_tree(G) - - def _show_ascii(self, target, commands, outs): - from dvc.dagascii import draw - - nodes, edges, _ = self._build_graph(target, commands, outs) - - if not nodes: - return - - draw(nodes, edges) - - def _show_dependencies_tree(self, target, commands, outs): - from treelib import Tree - - nodes, edges, is_tree = self._build_graph(target, commands, outs) - if not nodes: - return - if not is_tree: - raise DvcException( - "DAG is not a tree, can not print it in tree-structure way, " - "please use --ascii instead" - ) - - tree = Tree() - tree.create_node(target, target) # Root node - observe_list = [target] - while len(observe_list) > 0: - current_root = observe_list[0] - for edge in edges: - if edge[0] == current_root: - tree.create_node(edge[1], edge[1], parent=current_root) - observe_list.append(edge[1]) - observe_list.pop(0) - tree.show() - - def _write_dot(self, target, commands, outs): - import io - import networkx - from networkx.drawing.nx_pydot import write_dot - - _, edges, _ = self._build_graph(target, commands, outs) - edges = [edge[::-1] for edge in edges] - - simple_g = networkx.DiGraph() - simple_g.add_edges_from(edges) - - dot_file = io.StringIO() - write_dot(simple_g, dot_file) - logger.info(dot_file.getvalue()) - - def run(self): - from dvc.dvcfile import DVC_FILE - - if not self.args.targets: - self.args.targets = [DVC_FILE] - - for target in self.args.targets: - try: - if self.args.ascii: - self._show_ascii( - target, self.args.commands, self.args.outs - ) - elif self.args.dot: - self._write_dot(target, self.args.commands, self.args.outs) - elif self.args.tree: - self._show_dependencies_tree( - target, self.args.commands, self.args.outs - ) - else: - self._show( - target, - self.args.commands, - self.args.outs, - self.args.locked, - ) - except DvcException: - msg = f"failed to show pipeline for '{target}'" - logger.exception(msg) - return 1 - return 0 - - -class CmdPipelineList(CmdBase): - def run(self): - pipelines = self.repo.pipelines - for pipeline in pipelines: - for stage in pipeline: - logger.info(stage.addressing) - if len(pipeline) != 0: - logger.info("=" * 80) - logger.info("{} pipelines total".format(len(pipelines))) - - return 0 - - -def add_parser(subparsers, parent_parser): - PIPELINE_HELP = "Manage pipelines." - pipeline_parser = subparsers.add_parser( - "pipeline", - parents=[parent_parser], - description=append_doc_link(PIPELINE_HELP, "pipeline"), - help=PIPELINE_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - - pipeline_subparsers = pipeline_parser.add_subparsers( - dest="cmd", - help="Use `dvc pipeline CMD --help` for command-specific help.", - ) - - fix_subparsers(pipeline_subparsers) - - PIPELINE_LIST_HELP = "List connected groups of stages (pipelines)." - pipeline_list_parser = pipeline_subparsers.add_parser( - "list", - parents=[parent_parser], - description=append_doc_link(PIPELINE_LIST_HELP, "pipeline/list"), - help=PIPELINE_LIST_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - pipeline_list_parser.set_defaults(func=CmdPipelineList) - - PIPELINE_SHOW_HELP = "Show stages in a pipeline." - pipeline_show_parser = pipeline_subparsers.add_parser( - "show", - parents=[parent_parser], - description=append_doc_link(PIPELINE_SHOW_HELP, "pipeline/show"), - help=PIPELINE_SHOW_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - pipeline_show_group = pipeline_show_parser.add_mutually_exclusive_group() - pipeline_show_group.add_argument( - "-c", - "--commands", - action="store_true", - default=False, - help="Print commands instead of paths to DVC-files.", - ) - pipeline_show_group.add_argument( - "-o", - "--outs", - action="store_true", - default=False, - help="Print output files instead of paths to DVC-files.", - ) - pipeline_show_parser.add_argument( - "-l", - "--locked", - action="store_true", - default=False, - help="Print locked DVC stages", - ) - pipeline_show_parser.add_argument( - "--ascii", - action="store_true", - default=False, - help="Output DAG as ASCII.", - ) - pipeline_show_parser.add_argument( - "--dot", - action="store_true", - default=False, - help="Print DAG with .dot format.", - ) - pipeline_show_parser.add_argument( - "--tree", - action="store_true", - default=False, - help="Output DAG as Dependencies Tree.", - ) - pipeline_show_parser.add_argument( - "targets", - nargs="*", - help="DVC-files to show pipeline for. Optional. " - "(Finds all DVC-files in the workspace by default.)", - ) - pipeline_show_parser.set_defaults(func=CmdPipelineShow) diff --git a/dvc/dagascii.py b/dvc/dagascii.py index f1728ae130..419fc9965e 100644 --- a/dvc/dagascii.py +++ b/dvc/dagascii.py @@ -96,10 +96,9 @@ def __init__(self, cols, lines): def draw(self): """Draws ASCII canvas on the screen.""" - pager = find_pager() lines = map("".join, self.canvas) joined_lines = os.linesep.join(lines) - pager(joined_lines) + return joined_lines def point(self, x, y, char): """Create a point on ASCII canvas. @@ -316,4 +315,4 @@ def draw(vertexes, edges): int(round(x - minx)) + 1, int(round(y - miny)) + 1, vertex.data ) - canvas.draw() + return canvas.draw() diff --git a/dvc/utils/pager.py b/dvc/utils/pager.py new file mode 100644 index 0000000000..17431bdf9e --- /dev/null +++ b/dvc/utils/pager.py @@ -0,0 +1,48 @@ +"""Draws DAG in ASCII.""" + +import logging +import os +import pydoc +import sys + +from dvc.env import DVC_PAGER +from dvc.utils import format_link + +logger = logging.getLogger(__name__) + + +DEFAULT_PAGER = "less" +DEFAULT_PAGER_FORMATTED = "{} --chop-long-lines --clear-screen".format( + DEFAULT_PAGER +) + + +def make_pager(cmd): + def _pager(text): + return pydoc.tempfilepager(pydoc.plain(text), cmd) + + return _pager + + +def find_pager(): + if not sys.stdout.isatty(): + return pydoc.plainpager + + env_pager = os.getenv(DVC_PAGER) + if env_pager: + return make_pager(env_pager) + + if os.system(f"({DEFAULT_PAGER}) 2>{os.devnull}") == 0: + return make_pager(DEFAULT_PAGER_FORMATTED) + + logger.warning( + "Unable to find `less` in the PATH. Check out " + "{} for more info.".format( + format_link("https://man.dvc.org/pipeline/show") + ) + ) + return pydoc.plainpager + + +def pager(text): + find_pager()(text) diff --git a/scripts/completion/dvc.bash b/scripts/completion/dvc.bash index b5562a8593..5315e1cbb9 100644 --- a/scripts/completion/dvc.bash +++ b/scripts/completion/dvc.bash @@ -4,9 +4,10 @@ # - https://opensource.com/article/18/3/creating-bash-completion-script # - https://stackoverflow.com/questions/12933362 -_dvc_commands='add cache checkout commit config destroy diff fetch freeze get-url get gc \ - import-url import init install list metrics move params pipeline plots pull push \ - remote remove repro root run status unfreeze unprotect update version' +_dvc_commands='add cache checkout commit config dag destroy diff fetch freeze \ + get-url get gc import-url import init install list metrics move params \ + plots pull push remote remove repro root run status unfreeze unprotect \ + update version' _dvc_options='-h --help -V --version' _dvc_global_options='-h --help -q --quiet -v --verbose' @@ -20,6 +21,8 @@ _dvc_checkout_COMPGEN=_dvc_compgen_DVCFiles _dvc_commit='-f --force -d --with-deps -R --recursive' _dvc_commit_COMPGEN=_dvc_compgen_DVCFiles _dvc_config='-u --unset --local --system --global' +_dvc_dag='--dot --full' +_dvc_dag_COMPGEN=_dvc_compgen_DVCFiles _dvc_destroy='-f --force' _dvc_diff='-t --show-json --show-hash --show-md' _dvc_fetch='-j --jobs -r --remote -a --all-branches -T --all-tags -d --with-deps -R --recursive' @@ -49,10 +52,6 @@ _dvc_move='' _dvc_move_COMPGEN=_dvc_compgen_files _dvc_params='diff' _dvc_params_diff='--all --show-json --show-md --no-path' -_dvc_pipeline='list show' -_dvc_pipeline_list='' -_dvc_pipeline_show='-c --commands -o --outs --ascii --dot --tree -l --locked' -_dvc_pipeline_show_COMPGEN=_dvc_compgen_DVCFiles _dvc_plots='show diff' _dvc_plots_show='-t --template -o --out -x -y --show-json --no-csv-header --title --xlab --ylab' _dvc_plots_diff='-t --template --targets -o --out -x -y --show-json --no-csv-header --title --xlab --ylab' diff --git a/scripts/completion/dvc.zsh b/scripts/completion/dvc.zsh index 29482a3998..b04155db92 100644 --- a/scripts/completion/dvc.zsh +++ b/scripts/completion/dvc.zsh @@ -17,6 +17,7 @@ _dvc_commands() { "checkout:Checkout data files from cache." "commit:Save changed data to cache and update DVC-files." "config:Get or set config settings." + "dag:Visualize DVC project DAG." "destroy:Remove DVC-files, local DVC config and data cache." "diff:Show added, modified, or deleted data between commits in the DVC repository, or between a commit and the workspace." "fetch:Get files or directories tracked by DVC from remote storage into the cache." @@ -32,7 +33,6 @@ _dvc_commands() { "metrics:Commands to add, manage, collect and display metrics." "move:Rename or move a DVC controlled data file or a directory." "params:Commands to display params." - "pipeline:Manage pipelines." "pull:Pull data files from a DVC remote storage." "push:Push data files to a DVC remote storage." "plots:Generate plot for metrics structured as JSON, CSV or TSV." @@ -96,6 +96,12 @@ _dvc_config=( {-u,--unset}"[Unset option.]" ) +_dvc_dag=( + "--dot[Print DAG in DOT format.]" + "--full[Show full DAG that the target belongs too, instead of showing DAG consisting only of ancestors.]" + "1:Stage:" +) + _dvc_destroy=( {-f,--force}"[Force destruction.]" ) @@ -188,10 +194,6 @@ _dvc_params=( "1:Sub command:(diff)" ) -_dvc_pipeline=( - "1:Sub command:(show list)" -) - _dvc_pull=( {-j,--jobs}"[Number of jobs to run simultaneously.]:Number of jobs:" {-r,--remote}"[Remote repository to pull from.]:Remote repository:" @@ -305,6 +307,7 @@ case $words[1] in checkout) _arguments $_dvc_global_options $_dvc_checkout ;; commit) _arguments $_dvc_global_options $_dvc_commit ;; config) _arguments $_dvc_global_options $_dvc_config ;; + dag) _arguments $_dvc_global_options $_dvc_dag ;; destroy) _arguments $_dvc_global_options $_dvc_destroy ;; diff) _arguments $_dvc_global_options $_dvc_diff ;; fetch) _arguments $_dvc_global_options $_dvc_fetch ;; @@ -320,7 +323,6 @@ case $words[1] in metrics) _arguments $_dvc_global_options $_dvc_metrics ;; move) _arguments $_dvc_global_options $_dvc_move ;; params) _arguments $_dvc_global_options $_dvc_params ;; - pipeline) _arguments $_dvc_global_options $_dvc_pipeline ;; pull) _arguments $_dvc_global_options $_dvc_pull ;; push) _arguments $_dvc_global_options $_dvc_push ;; plots) _arguments $_dvc_global_options $_dvc_plots ;; diff --git a/setup.cfg b/setup.cfg index 723c7fa14e..f756962f00 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,6 +12,6 @@ select=B,C,E,F,W,T4,B9 [isort] include_trailing_comma=true known_first_party=dvc,tests -known_third_party=PyInstaller,RangeHTTPServer,boto3,colorama,configobj,distro,dpath,flaky,flufl,funcy,git,google,grandalf,mock,mockssh,moto,nanotime,packaging,paramiko,pathspec,pytest,requests,ruamel,setuptools,shortuuid,tqdm,voluptuous,yaml,zc +known_third_party=PyInstaller,RangeHTTPServer,boto3,colorama,configobj,distro,dpath,flaky,flufl,funcy,git,google,grandalf,mock,mockssh,moto,nanotime,networkx,packaging,paramiko,pathspec,pytest,requests,ruamel,setuptools,shortuuid,tqdm,voluptuous,yaml,zc line_length=79 multi_line_output=3 diff --git a/tests/func/test_pipeline.py b/tests/func/test_pipeline.py deleted file mode 100644 index 6c42ffd76d..0000000000 --- a/tests/func/test_pipeline.py +++ /dev/null @@ -1,354 +0,0 @@ -import logging - -from dvc.command.pipeline import CmdPipelineList, CmdPipelineShow -from dvc.main import main -from tests.basic_env import TestDvc -from tests.func.test_repro import TestRepro, TestReproChangedDeepData - - -class TestPipelineShowSingle(TestDvc): - def setUp(self): - super().setUp() - self.stage = "foo.dvc" - ret = main(["add", self.FOO]) - self.assertEqual(ret, 0) - - def test(self): - ret = main(["pipeline", "show", self.stage]) - self.assertEqual(ret, 0) - - def test_commands(self): - ret = main(["pipeline", "show", self.stage, "--commands"]) - self.assertEqual(ret, 0) - - def test_outs(self): - ret = main(["pipeline", "show", self.stage, "--outs"]) - self.assertEqual(ret, 0) - - def test_dot(self): - ret = main(["pipeline", "show", "--dot", self.stage]) - self.assertEqual(ret, 0) - - def test_tree(self): - ret = main(["pipeline", "show", "--tree", self.stage]) - self.assertEqual(ret, 0) - - def test_ascii_outs(self): - ret = main(["pipeline", "show", "--ascii", self.stage, "--outs"]) - self.assertEqual(ret, 0) - - def test_dot_commands(self): - ret = main(["pipeline", "show", "--dot", self.stage, "--commands"]) - self.assertEqual(ret, 0) - - def test_dot_outs(self): - ret = main(["pipeline", "show", "--dot", self.stage, "--outs"]) - self.assertEqual(ret, 0) - - def test_not_dvc_file(self): - ret = main(["pipeline", "show", self.FOO]) - self.assertNotEqual(ret, 0) - - def test_non_existing(self): - ret = main(["pipeline", "show", "non-existing"]) - self.assertNotEqual(ret, 0) - - -def test_single_ascii(tmp_dir, dvc): - tmp_dir.dvc_gen("foo", "foo content") - assert main(["pipeline", "show", "--ascii", "foo.dvc"]) == 0 - - -def test_single_ascii_commands(tmp_dir, dvc): - tmp_dir.dvc_gen("foo", "foo content") - assert main(["pipeline", "show", "--ascii", "foo.dvc", "--commands"]) == 0 - - -class TestPipelineShow(TestRepro): - def test(self): - ret = main(["pipeline", "show", self.file1_stage]) - self.assertEqual(ret, 0) - - def test_commands(self): - ret = main(["pipeline", "show", self.file1_stage, "--commands"]) - self.assertEqual(ret, 0) - - def test_ascii(self): - ret = main(["pipeline", "show", "--ascii", self.file1_stage]) - self.assertEqual(ret, 0) - - def test_dot(self): - ret = main(["pipeline", "show", "--dot", self.file1_stage]) - self.assertEqual(ret, 0) - - def test_ascii_commands(self): - ret = main( - ["pipeline", "show", "--ascii", self.file1_stage, "--commands"] - ) - self.assertEqual(ret, 0) - - def test_ascii_outs(self): - ret = main(["pipeline", "show", "--ascii", self.file1_stage, "--outs"]) - self.assertEqual(ret, 0) - - def test_dot_commands(self): - ret = main( - ["pipeline", "show", "--dot", self.file1_stage, "--commands"] - ) - self.assertEqual(ret, 0) - - -def test_disconnected_stage(tmp_dir, dvc): - tmp_dir.dvc_gen({"base": "base"}) - - dvc.add("base") - dvc.run( - deps=["base"], - outs=["derived1"], - cmd="echo derived1 > derived1", - single_stage=True, - ) - dvc.run( - deps=["base"], - outs=["derived2"], - cmd="echo derived2 > derived2", - single_stage=True, - ) - final_stage = dvc.run( - deps=["derived1"], - outs=["final"], - cmd="echo final > final", - single_stage=True, - ) - - command = CmdPipelineShow([]) - # Need to test __build_graph directly - nodes, edges, is_tree = command._build_graph( - final_stage.path, commands=False, outs=True - ) - - assert set(nodes) == {"final", "derived1", "base"} - assert edges == [("final", "derived1"), ("derived1", "base")] - assert is_tree is True - - -def test_print_locked_stages(tmp_dir, dvc, caplog): - tmp_dir.dvc_gen({"foo": "foo content", "bar": "bar content"}) - dvc.freeze("foo.dvc") - - caplog.clear() - with caplog.at_level(logging.INFO, logger="dvc"): - assert main(["pipeline", "show", "foo.dvc", "--locked"]) == 0 - - assert "foo.dvc" in caplog.text - assert "bar.dvc" not in caplog.text - - -def test_dot_outs(tmp_dir, dvc, run_copy): - tmp_dir.gen("foo", "foo content") - run_copy("foo", "file", single_stage=True) - assert main(["pipeline", "show", "--dot", "file.dvc", "--outs"]) == 0 - - -class TestPipelineShowOuts(TestRepro): - def setUp(self): - super().setUp() - - def test_outs(self): - ret = main(["pipeline", "show", self.file1_stage, "--outs"]) - self.assertEqual(ret, 0) - - -class TestPipelineShowDeep(TestReproChangedDeepData): - def test(self): - ret = main(["pipeline", "show", self.file1_stage]) - self.assertEqual(ret, 0) - - def test_commands(self): - ret = main(["pipeline", "show", self.file1_stage, "--commands"]) - self.assertEqual(ret, 0) - - def test_outs(self): - ret = main(["pipeline", "show", self.file1_stage, "--outs"]) - self.assertEqual(ret, 0) - - def test_ascii(self): - ret = main(["pipeline", "show", "--ascii", self.file1_stage]) - self.assertEqual(ret, 0) - - def test_dot(self): - ret = main(["pipeline", "show", "--dot", self.file1_stage]) - self.assertEqual(ret, 0) - - def test_ascii_commands(self): - ret = main( - ["pipeline", "show", "--ascii", self.file1_stage, "--commands"] - ) - self.assertEqual(ret, 0) - - def test_ascii_outs(self): - ret = main(["pipeline", "show", "--ascii", self.file1_stage, "--outs"]) - self.assertEqual(ret, 0) - - def test_dot_commands(self): - ret = main( - ["pipeline", "show", "--dot", self.file1_stage, "--commands"] - ) - self.assertEqual(ret, 0) - - def test_dot_outs(self): - ret = main(["pipeline", "show", "--dot", self.file1_stage, "--outs"]) - self.assertEqual(ret, 0) - - -class TestPipelineListEmpty(TestDvc): - def test(self): - ret = main(["pipeline", "list"]) - self.assertEqual(ret, 0) - - -class TestPipelineListSingle(TestPipelineShowDeep): - def test(self): - ret = main(["pipeline", "list"]) - self.assertEqual(ret, 0) - - -class TestDvcRepoPipeline(TestDvc): - def test_no_stages(self): - pipelines = self.dvc.pipelines - self.assertEqual(len(pipelines), 0) - - def one_pipeline(self): - self.dvc.add("foo") - self.dvc.run(deps=["foo"], outs=["bar"], cmd="", single_stage=True) - self.dvc.run( - deps=["bar"], outs=["baz"], cmd="echo baz > baz", single_stage=True - ) - pipelines = self.dvc.pipelines - - self.assertEqual(len(pipelines), 1) - self.assertEqual(pipelines[0].nodes, 3) - self.assertEqual(pipelines[0].edges, 2) - - def two_pipelines(self): - self.dvc.add("foo") - self.dvc.run(deps=["foo"], outs=["bar"], cmd="", single_stage=True) - self.dvc.run( - deps=["bar"], outs=["baz"], cmd="echo baz > baz", single_stage=True - ) - - self.dvc.add("code.py") - - pipelines = self.dvc.pipelines - - self.assertEqual(len(pipelines), 2) - self.assertEqual(pipelines[0].nodes, 3) - self.assertEqual(pipelines[0].edges, 2) - self.assertEqual(pipelines[1].nodes, 1) - self.assertEqual(pipelines[1].edges, 0) - - def locked_stage(self): - self.dvc.add("foo") - self.dvc.freeze("foo.dvc") - - pipelines = self.dvc.pipelines - self.assertEqual(len(pipelines), 0) - - -def test_split_pipeline(tmp_dir, scm, dvc): - tmp_dir.scm_gen("git_dep1", "git_dep1") - tmp_dir.scm_gen("git_dep2", "git_dep2") - - tmp_dir.dvc_gen("data", "source file content") - dvc.run( - deps=["git_dep1", "data"], - outs=["data_train", "data_valid"], - cmd="echo train >> data_train && echo valid >> data_valid", - single_stage=True, - ) - stage = dvc.run( - deps=["git_dep2", "data_train", "data_valid"], - outs=["result"], - cmd="echo result >> result", - single_stage=True, - ) - - command = CmdPipelineShow([]) - nodes, edges, _ = command._build_graph( - stage.path, commands=False, outs=True - ) - assert set(nodes) == {"data", "data_train", "data_valid", "result"} - assert set(edges) == { - ("result", "data_train"), - ("result", "data_valid"), - ("data_train", "data"), - ("data_valid", "data"), - } - - -def test_pipeline_list_show_multistage(tmp_dir, dvc, run_copy, caplog): - tmp_dir.gen("foo", "foo") - run_copy("foo", "bar", name="copy-foo-bar") - run_copy("bar", "foobar", single_stage=True) - command = CmdPipelineShow([]) - - caplog.clear() - with caplog.at_level(logging.INFO, "dvc"): - command._show("foobar.dvc", False, False, False) - output = caplog.text.splitlines() - assert "copy-foo-bar" in output[0] - assert "foobar.dvc" in output[1] - - caplog.clear() - with caplog.at_level(logging.INFO, "dvc"): - command._show("copy-foo-bar", False, False, False) - assert "copy-foo-bar" in caplog.text - assert "foobar.dvc" not in caplog.text - - command = CmdPipelineList([]) - caplog.clear() - with caplog.at_level(logging.INFO, "dvc"): - command.run() - assert "copy-foo-bar" in caplog.text - assert "foobar.dvc" in caplog.text - assert "1 pipelines in total" - - -def test_pipeline_ascii_multistage(tmp_dir, dvc, run_copy): - tmp_dir.gen("foo", "foo") - run_copy("foo", "bar", name="copy-foo-bar") - run_copy("bar", "foobar", single_stage=True) - command = CmdPipelineShow([]) - nodes, edges, _ = command._build_graph("foobar.dvc") - assert set(nodes) == {"copy-foo-bar", "foobar.dvc"} - assert set(edges) == { - ("foobar.dvc", "copy-foo-bar"), - } - - nodes, *_ = command._build_graph("copy-foo-bar") - assert set(nodes) == {"copy-foo-bar"} - - -def test_pipeline_multi_outputs_stages(dvc): - dvc.run( - outs=["alice", "bob"], - cmd="echo alice>alice && echo bob>bob", - single_stage=True, - ) - dvc.run( - deps=["alice"], - outs=["mary", "mike"], - cmd="echo mary>mary && echo mike>mike", - single_stage=True, - ) - stage = dvc.run( - deps=["mary"], - outs=["carol"], - cmd="echo carol>carol", - single_stage=True, - ) - - command = CmdPipelineShow([]) - nodes, edges, _ = command._build_graph(stage.path, outs=True) - assert set(nodes) == {"alice", "mary", "carol"} - assert set(edges) == {("carol", "mary"), ("mary", "alice")} diff --git a/tests/unit/command/test_dag.py b/tests/unit/command/test_dag.py new file mode 100644 index 0000000000..d854fabe57 --- /dev/null +++ b/tests/unit/command/test_dag.py @@ -0,0 +1,105 @@ +import networkx as nx +import pytest + +from dvc.cli import parse_args +from dvc.command.dag import CmdDAG, _build, _show_ascii, _show_dot + + +@pytest.mark.parametrize("fmt", [None, "--dot"]) +def test_dag(tmp_dir, dvc, mocker, fmt): + tmp_dir.dvc_gen("foo", "foo") + + args = ["dag", "--full", "foo.dvc"] + if fmt: + args.append(fmt) + cli_args = parse_args(args) + assert cli_args.func == CmdDAG + + cmd = cli_args.func(cli_args) + + mocker.patch("dvc.command.dag._build", return_value=dvc.graph) + + assert cmd.run() == 0 + + +@pytest.fixture +def graph(tmp_dir, dvc): + tmp_dir.dvc_gen("a", "a") + tmp_dir.dvc_gen("b", "b") + + dvc.run( + no_exec=True, deps=["a", "c"], outs=["d", "e"], cmd="cmd1", name="1" + ) + dvc.run( + no_exec=True, deps=["b", "c"], outs=["f", "g"], cmd="cmd2", name="2" + ) + dvc.run( + no_exec=True, + deps=["a", "b", "c"], + outs=["h", "i"], + cmd="cmd3", + name="3", + ) + dvc.run(no_exec=True, deps=["a", "h"], outs=["j"], cmd="cmd4", name="4") + + return dvc.graph + + +def test_build(graph): + assert nx.is_isomorphic(_build(graph), graph) + + +def test_build_target(graph): + (stage,) = filter( + lambda s: hasattr(s, "name") and s.name == "3", graph.nodes() + ) + G = _build(graph, target=stage) + assert set(G.nodes()) == {"3", "b.dvc", "a.dvc"} + assert set(G.edges()) == {("3", "a.dvc"), ("3", "b.dvc")} + + +def test_build_full(graph): + (stage,) = filter( + lambda s: hasattr(s, "name") and s.name == "3", graph.nodes() + ) + G = _build(graph, target=stage, full=True) + assert nx.is_isomorphic(G, graph) + + +def test_show_ascii(graph): + assert [line.rstrip() for line in _show_ascii(graph).splitlines()] == [ + " +----------------+ +----------------+", # noqa: E501 + " | stage: 'a.dvc' | | stage: 'b.dvc' |", # noqa: E501 + " *+----------------+**** +----------------+", # noqa: E501 + " ***** * ***** *** ***", # noqa: E501 + " **** * ***** ** **", # noqa: E501 + " *** * *** ** **", # noqa: E501 + "+------------+ ** +------------+ +------------+", # noqa: E501 + "| stage: '1' | ** | stage: '3' | | stage: '2' |", # noqa: E501 + "+------------+ *** +------------+ +------------+", # noqa: E501 + " ** ***", + " ** **", + " ** **", + " +------------+", + " | stage: '4' |", + " +------------+", + ] + + +def test_show_dot(graph): + assert _show_dot(graph) == ( + "strict digraph {\n" + "stage;\n" + "stage;\n" + "stage;\n" + "stage;\n" + "stage;\n" + "stage;\n" + "\"stage: '1'\" -> \"stage: 'a.dvc'\";\n" + "\"stage: '2'\" -> \"stage: 'b.dvc'\";\n" + "\"stage: '3'\" -> \"stage: 'a.dvc'\";\n" + "\"stage: '3'\" -> \"stage: 'b.dvc'\";\n" + "\"stage: '4'\" -> \"stage: 'a.dvc'\";\n" + "\"stage: '4'\" -> \"stage: '3'\";\n" + "}\n" + )