Skip to content

Commit

Permalink
Do the de-duplication in _reproduce_stages
Browse files Browse the repository at this point in the history
  • Loading branch information
charlesbaynham committed Apr 21, 2020
1 parent f5d89f6 commit 06a1a96
Showing 1 changed file with 41 additions and 42 deletions.
83 changes: 41 additions & 42 deletions dvc/repo/reproduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,12 @@ def reproduce(
path, name=name, recursive=recursive, graph=active_graph
)

ret = []
checked_stages = set()
for target in targets:
stages, these_checked_stages = _reproduce_stages(
active_graph, target, checked_stages, **kwargs
)
ret.extend(stages)
checked_stages.update(these_checked_stages)

return ret
return _reproduce_stages(active_graph, targets, **kwargs)


def _reproduce_stages(
G,
stage,
checked_stages,
stages,
downstream=False,
ignore_build_cache=False,
single_item=False,
Expand Down Expand Up @@ -152,36 +142,45 @@ def _reproduce_stages(
import networkx as nx

if single_item:
pipeline = [stage]
elif downstream:
# NOTE (py3 only):
# Python's `deepcopy` defaults to pickle/unpickle the object.
# Stages are complex objects (with references to `repo`, `outs`,
# and `deps`) that cause struggles when you try to serialize them.
# We need to create a copy of the graph itself, and then reverse it,
# instead of using graph.reverse() directly because it calls
# `deepcopy` underneath -- unless copy=False is specified.
pipeline = nx.dfs_preorder_nodes(G.copy().reverse(copy=False), stage)
all_pipelines = stages
else:
pipeline = nx.dfs_postorder_nodes(G, stage)
all_pipelines = []
for stage in stages:
if downstream:
# NOTE (py3 only):
# Python's `deepcopy` defaults to pickle/unpickle the object.
# Stages are complex objects (with references to `repo`,
# `outs`, and `deps`) that cause struggles when you try
# to serialize them. We need to create a copy of the graph
# itself, and then reverse it, instead of using
# graph.reverse() directly because it calls `deepcopy`
# underneath -- unless copy=False is specified.
all_pipelines += nx.dfs_preorder_nodes(
G.copy().reverse(copy=False), stage
)
else:
all_pipelines += nx.dfs_postorder_nodes(G, stage)

pipeline = []
for stage in all_pipelines:
if stage not in pipeline:
pipeline.append(stage)

result = []
these_checked_stages = []
for st in pipeline:
if st not in checked_stages:
try:
ret = _reproduce_stage(st, **kwargs)
these_checked_stages.append(st)

if len(ret) != 0 and ignore_build_cache:
# NOTE: we are walking our pipeline from the top to the
# bottom. If one stage is changed, it will be reproduced,
# which tells us that we should force reproducing all of
# the other stages down below, even if their direct
# dependencies didn't change.
kwargs["force"] = True

result.extend(ret)
except Exception as exc:
raise ReproductionError(st.relpath) from exc
return result, these_checked_stages
for stage in pipeline:
try:
ret = _reproduce_stage(stage, **kwargs)

if len(ret) != 0 and ignore_build_cache:
# NOTE: we are walking our pipeline from the top to the
# bottom. If one stage is changed, it will be reproduced,
# which tells us that we should force reproducing all of
# the other stages down below, even if their direct
# dependencies didn't change.
kwargs["force"] = True

result.extend(ret)
except Exception as exc:
raise ReproductionError(stage.relpath) from exc

return result

0 comments on commit 06a1a96

Please sign in to comment.