Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvc: implement multi-stage dvcfile #3584

Merged
merged 1 commit into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions dvc/command/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,26 @@
logger = logging.getLogger(__name__)


def _stage_repr(stage):
efiop marked this conversation as resolved.
Show resolved Hide resolved
from dvc.stage import PipelineStage

return (
"{}:{}".format(stage.relpath, stage.name)
if isinstance(stage, PipelineStage)
else stage.relpath
)


class CmdPipelineShow(CmdBase):
def _show(self, target, commands, outs, locked):
import networkx
from dvc.dvcfile import Dvcfile
from dvc import dvcfile
from dvc.utils import parse_target

stage = Dvcfile(self.repo, target).load()
G = self.repo.graph
path, name = parse_target(target)
stage = dvcfile.Dvcfile(self.repo, path).load_one(name)
G = self.repo.pipeline_graph
stages = networkx.dfs_postorder_nodes(G, stage)

if locked:
stages = [s for s in stages if s.locked]

Expand All @@ -29,14 +40,16 @@ def _show(self, target, commands, outs, locked):
for out in stage.outs:
logger.info(str(out))
else:
logger.info(stage.path_in_repo)
logger.info(_stage_repr(stage))

def _build_graph(self, target, commands, outs):
def _build_graph(self, target, commands=False, outs=False):
import networkx
from dvc.dvcfile import Dvcfile
from dvc import dvcfile
from dvc.repo.graph import get_pipeline
from dvc.utils import parse_target

target_stage = Dvcfile(self.repo, target).load()
path, name = parse_target(target)
target_stage = dvcfile.Dvcfile(self.repo, path).load_one(name)
G = get_pipeline(self.repo.pipelines, target_stage)

nodes = set()
Expand All @@ -49,7 +62,7 @@ def _build_graph(self, target, commands, outs):
for out in stage.outs:
nodes.add(str(out))
else:
nodes.add(stage.relpath)
nodes.add(_stage_repr(stage))

edges = []
for from_stage, to_stage in networkx.edge_dfs(G, target_stage):
Expand All @@ -62,7 +75,7 @@ def _build_graph(self, target, commands, outs):
for to_out in to_stage.outs:
edges.append((str(from_out), str(to_out)))
else:
edges.append((from_stage.relpath, to_stage.relpath))
edges.append((_stage_repr(from_stage), _stage_repr(to_stage)))

return list(nodes), edges, networkx.is_tree(G)

Expand Down Expand Up @@ -150,7 +163,7 @@ def run(self):
pipelines = self.repo.pipelines
for pipeline in pipelines:
for stage in pipeline:
logger.info(stage.relpath)
logger.info(_stage_repr(stage))
if len(pipeline) != 0:
logger.info("=" * 80)
logger.info("{} pipelines total".format(len(pipelines)))
Expand Down
2 changes: 2 additions & 0 deletions dvc/command/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def run(self):
outs_persist=self.args.outs_persist,
outs_persist_no_cache=self.args.outs_persist_no_cache,
always_changed=self.args.always_changed,
name=self.args.name,
)
except DvcException:
logger.exception("failed to run command")
Expand Down Expand Up @@ -96,6 +97,7 @@ def add_parser(subparsers, parent_parser):
default=[],
help="Declare dependencies for reproducible cmd.",
)
run_parser.add_argument("-n", "--name", help=argparse.SUPPRESS)
run_parser.add_argument(
"-o",
"--outs",
Expand Down
Loading