Skip to content

Commit

Permalink
dvc: add a support for multistage dvcfile.
Browse files Browse the repository at this point in the history
Now, pipeline stage files can house multiple stages, and separate lockfiles
are created which has the checksums, whereas Dvcfile will be clean and human
readable and editable. The *.dvc files will be generated for output files.

It is available via a hidden flag: -n.

Fixes #1871
PR: #3584
  • Loading branch information
skshetry committed Apr 17, 2020
1 parent 11e56db commit f309b79
Show file tree
Hide file tree
Showing 23 changed files with 1,735 additions and 293 deletions.
35 changes: 24 additions & 11 deletions dvc/command/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,26 @@
logger = logging.getLogger(__name__)


def _stage_repr(stage):
from dvc.stage import PipelineStage

return (
"{}:{}".format(stage.relpath, stage.name)
if isinstance(stage, PipelineStage)
else stage.relpath
)


class CmdPipelineShow(CmdBase):
def _show(self, target, commands, outs, locked):
import networkx
from dvc.dvcfile import Dvcfile
from dvc import dvcfile
from dvc.utils import parse_target

stage = Dvcfile(self.repo, target).load()
G = self.repo.graph
path, name = parse_target(target)
stage = dvcfile.Dvcfile(self.repo, path).load_one(name)
G = self.repo.pipeline_graph
stages = networkx.dfs_postorder_nodes(G, stage)

if locked:
stages = [s for s in stages if s.locked]

Expand All @@ -29,14 +40,16 @@ def _show(self, target, commands, outs, locked):
for out in stage.outs:
logger.info(str(out))
else:
logger.info(stage.path_in_repo)
logger.info(_stage_repr(stage))

def _build_graph(self, target, commands, outs):
def _build_graph(self, target, commands=False, outs=False):
import networkx
from dvc.dvcfile import Dvcfile
from dvc import dvcfile
from dvc.repo.graph import get_pipeline
from dvc.utils import parse_target

target_stage = Dvcfile(self.repo, target).load()
path, name = parse_target(target)
target_stage = dvcfile.Dvcfile(self.repo, path).load_one(name)
G = get_pipeline(self.repo.pipelines, target_stage)

nodes = set()
Expand All @@ -49,7 +62,7 @@ def _build_graph(self, target, commands, outs):
for out in stage.outs:
nodes.add(str(out))
else:
nodes.add(stage.relpath)
nodes.add(_stage_repr(stage))

edges = []
for from_stage, to_stage in networkx.edge_dfs(G, target_stage):
Expand All @@ -62,7 +75,7 @@ def _build_graph(self, target, commands, outs):
for to_out in to_stage.outs:
edges.append((str(from_out), str(to_out)))
else:
edges.append((from_stage.relpath, to_stage.relpath))
edges.append((_stage_repr(from_stage), _stage_repr(to_stage)))

return list(nodes), edges, networkx.is_tree(G)

Expand Down Expand Up @@ -150,7 +163,7 @@ def run(self):
pipelines = self.repo.pipelines
for pipeline in pipelines:
for stage in pipeline:
logger.info(stage.relpath)
logger.info(_stage_repr(stage))
if len(pipeline) != 0:
logger.info("=" * 80)
logger.info("{} pipelines total".format(len(pipelines)))
Expand Down
2 changes: 2 additions & 0 deletions dvc/command/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def run(self):
outs_persist=self.args.outs_persist,
outs_persist_no_cache=self.args.outs_persist_no_cache,
always_changed=self.args.always_changed,
name=self.args.name,
)
except DvcException:
logger.exception("failed to run command")
Expand Down Expand Up @@ -96,6 +97,7 @@ def add_parser(subparsers, parent_parser):
default=[],
help="Declare dependencies for reproducible cmd.",
)
run_parser.add_argument("-n", "--name", help=argparse.SUPPRESS)
run_parser.add_argument(
"-o",
"--outs",
Expand Down
Loading

0 comments on commit f309b79

Please sign in to comment.