Skip to content

Commit

Permalink
Merge pull request #2030 from prihoda/recursive-repro
Browse files Browse the repository at this point in the history
Add repro -R, support dirs & stage files when running pull/push/repro…
  • Loading branch information
efiop authored May 27, 2019
2 parents 9e0e092 + d072c2b commit 29fd8e2
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 16 deletions.
11 changes: 9 additions & 2 deletions dvc/command/repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

class CmdRepro(CmdBase):
def run(self):
recursive = not self.args.single_item
saved_dir = os.path.realpath(os.curdir)
if self.args.cwd:
os.chdir(self.args.cwd)
Expand All @@ -31,7 +30,7 @@ def run(self):
try:
stages = self.repo.reproduce(
target,
recursive=recursive,
single_item=self.args.single_item,
force=self.args.force,
dry=self.args.dry,
interactive=self.args.interactive,
Expand All @@ -40,6 +39,7 @@ def run(self):
ignore_build_cache=self.args.ignore_build_cache,
no_commit=self.args.no_commit,
downstream=self.args.downstream,
recursive=self.args.recursive,
)

if len(stages) == 0:
Expand Down Expand Up @@ -128,6 +128,13 @@ def add_parser(subparsers, parent_parser):
default=False,
help="Reproduce all pipelines in the repo.",
)
repro_parser.add_argument(
"-R",
"--recursive",
action="store_true",
default=False,
help="Reproduce all stages in the specified directory.",
)
repro_parser.add_argument(
"--ignore-build-cache",
action="store_true",
Expand Down
4 changes: 2 additions & 2 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def collect(self, target, with_deps=False, recursive=False):
import networkx as nx
from dvc.stage import Stage

if not target or recursive:
if not target or (recursive and os.path.isdir(target)):
return self.active_stages(target)

stage = Stage.load(self, target)
Expand Down Expand Up @@ -306,7 +306,7 @@ def used_cache(
all_branches=all_branches, all_tags=all_tags
):
if target:
if recursive:
if recursive and os.path.isdir(target):
stages = self.stages(target)
else:
stages = self.collect(target, with_deps=with_deps)
Expand Down
26 changes: 17 additions & 9 deletions dvc/repo/reproduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _reproduce_stage(stages, node, force, dry, interactive, no_commit):
def reproduce(
self,
target=None,
recursive=True,
single_item=False,
force=False,
dry=False,
interactive=False,
Expand All @@ -44,7 +44,9 @@ def reproduce(
ignore_build_cache=False,
no_commit=False,
downstream=False,
recursive=False,
):
import networkx as nx
from dvc.stage import Stage

if not target and not all_pipelines:
Expand All @@ -56,7 +58,13 @@ def reproduce(
interactive = core.get(config.SECTION_CORE_INTERACTIVE, False)

targets = []
if pipeline or all_pipelines:
if recursive and os.path.isdir(target):
G = self.graph(from_directory=target)[1]
dir_targets = [
os.path.join(self.root_dir, n) for n in nx.dfs_postorder_nodes(G)
]
targets.extend(dir_targets)
elif pipeline or all_pipelines:
if pipeline:
stage = Stage.load(self, target)
node = os.path.relpath(stage.path, self.root_dir)
Expand All @@ -77,7 +85,7 @@ def reproduce(
stages = _reproduce(
self,
target,
recursive=recursive,
single_item=single_item,
force=force,
dry=dry,
interactive=interactive,
Expand All @@ -93,7 +101,7 @@ def reproduce(
def _reproduce(
self,
target,
recursive=True,
single_item=False,
force=False,
dry=False,
interactive=False,
Expand All @@ -109,7 +117,11 @@ def _reproduce(
stages = nx.get_node_attributes(G, "stage")
node = os.path.relpath(stage.path, self.root_dir)

if recursive:
if single_item:
ret = _reproduce_stage(
stages, node, force, dry, interactive, no_commit
)
else:
ret = _reproduce_stages(
G,
stages,
Expand All @@ -121,10 +133,6 @@ def _reproduce(
no_commit,
downstream,
)
else:
ret = _reproduce_stage(
stages, node, force, dry, interactive, no_commit
)

return ret

Expand Down
6 changes: 4 additions & 2 deletions tests/func/test_checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,10 @@ def test(self):
ret = main(["add", self.FOO])
self.assertEqual(0, ret)

with self.assertRaises(TargetNotDirectoryError):
self.dvc.checkout(target=self.FOO, recursive=True)
try:
self.dvc.checkout(target=self.FOO + ".dvc", recursive=True)
except TargetNotDirectoryError:
self.fail("should not raise TargetNotDirectoryError")


class TestCheckoutMovedCacheDirWithSymlinks(TestDvc):
Expand Down
171 changes: 171 additions & 0 deletions tests/func/test_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,177 @@ def test(self):
self.assertIn(expected_metrics_display, self._caplog.text)


@pytest.fixture
def repro_dir(dvc_repo, repo_dir):
repo_dir.dname = "dir"
os.mkdir(repo_dir.dname)
repo_dir.emptydname = "emptydir"
os.mkdir(repo_dir.emptydname)
subdname = os.path.join(repo_dir.dname, "subdir")
os.mkdir(subdname)

repo_dir.source = "source"
repo_dir.source_stage = repo_dir.source + ".dvc"
stage = dvc_repo.run(
fname=repo_dir.source_stage,
outs=[repo_dir.source],
deps=[repo_dir.FOO],
)
assert stage is not None
assert filecmp.cmp(repo_dir.source, repo_dir.FOO, shallow=False)

repo_dir.unrelated1 = "unrelated1"
repo_dir.unrelated1_stage = repo_dir.unrelated1 + ".dvc"
stage = dvc_repo.run(
fname=repo_dir.unrelated1_stage,
outs=[repo_dir.unrelated1],
deps=[repo_dir.source],
)
assert stage is not None

repo_dir.unrelated2 = "unrelated2"
repo_dir.unrelated2_stage = repo_dir.unrelated2 + ".dvc"
stage = dvc_repo.run(
fname=repo_dir.unrelated2_stage,
outs=[repo_dir.unrelated2],
deps=[repo_dir.DATA],
)
assert stage is not None

repo_dir.first = os.path.join(repo_dir.dname, "first")
repo_dir.first_stage = repo_dir.first + ".dvc"

stage = dvc_repo.run(
fname=repo_dir.first_stage,
deps=[repo_dir.source],
outs=[repo_dir.first],
cmd="python {} {} {}".format(
repo_dir.CODE, repo_dir.source, repo_dir.first
),
)
assert stage is not None
assert filecmp.cmp(repo_dir.first, repo_dir.FOO, shallow=False)

repo_dir.second = os.path.join(subdname, "second")
repo_dir.second_stage = repo_dir.second + ".dvc"
stage = dvc_repo.run(
fname=repo_dir.second_stage,
outs=[repo_dir.second],
deps=[repo_dir.DATA],
)
assert stage is not None
assert filecmp.cmp(repo_dir.second, repo_dir.DATA, shallow=False)

repo_dir.third_stage = os.path.join(repo_dir.dname, "Dvcfile")
stage = dvc_repo.run(
fname=repo_dir.third_stage, deps=[repo_dir.first, repo_dir.second]
)
assert stage is not None

yield repo_dir


def test_recursive_repro_default(dvc_repo, repro_dir):
"""
Test recursive repro on dir after a dep outside this dir has changed.
"""
os.unlink(repro_dir.FOO)
shutil.copyfile(repro_dir.BAR, repro_dir.FOO)

stages = dvc_repo.reproduce(repro_dir.dname, recursive=True)
# Check that the dependency ("source") and the dependent stages
# inside the folder have been reproduced ("first", "third")
assert len(stages) == 3
names = [stage.relpath for stage in stages]
assert repro_dir.source_stage in names
assert repro_dir.first_stage in names
assert repro_dir.third_stage in names
assert filecmp.cmp(repro_dir.source, repro_dir.BAR, shallow=False)
assert filecmp.cmp(repro_dir.first, repro_dir.BAR, shallow=False)


def test_recursive_repro_single(dvc_repo, repro_dir):
"""
Test recursive single-item repro on dir
after a dep outside this dir has changed.
"""
os.unlink(repro_dir.FOO)
shutil.copyfile(repro_dir.BAR, repro_dir.FOO)

os.unlink(repro_dir.DATA)
shutil.copyfile(repro_dir.BAR, repro_dir.DATA)

stages = dvc_repo.reproduce(
repro_dir.dname, recursive=True, single_item=True
)
# Check that just stages inside given dir
# with changed direct deps have been reproduced.
# This means that "first" stage should not be reproduced
# since it depends on "source".
# Also check that "second" stage was reproduced before "third" stage
assert len(stages) == 2
assert repro_dir.second_stage == stages[0].relpath
assert repro_dir.third_stage == stages[1].relpath
assert filecmp.cmp(repro_dir.second, repro_dir.BAR, shallow=False)


def test_recursive_repro_single_force(dvc_repo, repro_dir):
"""
Test recursive single-item force repro on dir
without any dependencies changing.
"""
stages = dvc_repo.reproduce(
repro_dir.dname, recursive=True, single_item=True, force=True
)
assert len(stages) == 3
names = [stage.relpath for stage in stages]
# Check that all stages inside given dir have been reproduced
# Also check that "second" stage was reproduced before "third" stage
# and that "first" stage was reproduced before "third" stage
assert repro_dir.first_stage in names
assert repro_dir.second_stage in names
assert repro_dir.third_stage in names
assert names.index(repro_dir.first_stage) < names.index(
repro_dir.third_stage
)
assert names.index(repro_dir.second_stage) < names.index(
repro_dir.third_stage
)


def test_recursive_repro_empty_dir(dvc_repo, repro_dir):
"""
Test recursive repro on an empty directory
"""
stages = dvc_repo.reproduce(
repro_dir.emptydname, recursive=True, force=True
)
assert len(stages) == 0


def test_recursive_repro_recursive_missing_file(dvc_repo):
"""
Test recursive repro on a missing file
"""
with pytest.raises(StageFileDoesNotExistError):
dvc_repo.reproduce("notExistingStage.dvc", recursive=True)
with pytest.raises(StageFileDoesNotExistError):
dvc_repo.reproduce("notExistingDir/", recursive=True)


def test_recursive_repro_on_stage_file(dvc_repo, repro_dir):
"""
Test recursive repro on a stage file instead of directory
"""
stages = dvc_repo.reproduce(
repro_dir.first_stage, recursive=True, force=True
)
assert len(stages) == 2
names = [stage.relpath for stage in stages]
assert repro_dir.source_stage in names
assert repro_dir.first_stage in names


@pytest.fixture
def foo_copy(repo_dir, dvc_repo):
stages = dvc_repo.add(repo_dir.FOO)
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/command/test_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"interactive": False,
"no_commit": False,
"pipeline": False,
"recursive": True,
"single_item": False,
"recursive": False,
}


Expand Down

0 comments on commit 29fd8e2

Please sign in to comment.