Skip to content

Commit

Permalink
dvc: get rid of can_be_skipped logic (#5385)
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop authored Feb 2, 2021
1 parent 9ad1e0e commit 8932e0b
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 153 deletions.
9 changes: 3 additions & 6 deletions dvc/repo/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,9 @@ def _create_stages(
external=external,
)
restore_meta(stage)
if stage.can_be_skipped:
stage = None
else:
Dvcfile(repo, stage.path).remove()
if desc:
stage.outs[0].desc = desc
Dvcfile(repo, stage.path).remove()
if desc:
stage.outs[0].desc = desc

repo._reset() # pylint: disable=protected-access

Expand Down
2 changes: 0 additions & 2 deletions dvc/repo/imp_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ def imp_url(
erepo=erepo,
)
restore_meta(stage)
if stage.can_be_skipped:
return None

if desc:
stage.outs[0].desc = desc
Expand Down
2 changes: 0 additions & 2 deletions dvc/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ def run(
from dvc.stage.utils import validate_state

stage = self.stage.create_from_cli(**kwargs)
if run_cache and stage.can_be_skipped:
return None

validate_state(self, stage, force=force)

Expand Down
47 changes: 0 additions & 47 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
fill_stage_dependencies,
fill_stage_outputs,
get_dump,
stage_dump_eq,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -416,51 +415,9 @@ def update(self, rev=None):
raise StageUpdateError(self.relpath)
update_import(self, rev=rev)

@property
def can_be_skipped(self):
if not self.dvcfile.exists():
return False

has_persist_outs = any(out.persist for out in self.outs)
if has_persist_outs:
logger.warning("Build cache is ignored when persisting outputs.")
return False

if self.is_cached and not self.is_callback and not self.always_changed:
logger.info("Stage is cached, skipping")
return True

return False

def reload(self):
return self.dvcfile.stage

@property
def is_cached(self):
"""Checks if this stage has been already ran and stored"""
old = self.reload()
if old.changed_outs():
return False

# NOTE: need to save checksums for deps in order to compare them
# with what is written in the old stage.
self.save_deps()
if not stage_dump_eq(Stage, old.dumpd(), self.dumpd()):
return False

# NOTE: committing to prevent potential data duplication. For example
#
# $ dvc config cache.type hardlink
# $ echo foo > foo
# $ dvc add foo
# $ rm -f foo
# $ echo foo > foo
# $ dvc add foo # should replace foo with a link to cache
#
old.commit()

return True

def dumpd(self):
return get_dump(self)

Expand Down Expand Up @@ -729,10 +686,6 @@ def addressing(self):
def reload(self):
return self.dvcfile.stages[self.name]

@property
def is_cached(self):
return self.name in self.dvcfile.stages and super().is_cached

def _status_stage(self, ret):
if self.cmd_changed:
ret.append("changed command")
Expand Down
27 changes: 0 additions & 27 deletions dvc/stage/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import pathlib
from contextlib import suppress
from itertools import product
from typing import TYPE_CHECKING, Any, Union

from funcy import concat, first, lsplit, rpartial, without
Expand Down Expand Up @@ -166,32 +165,6 @@ def check_missing_outputs(stage):
raise MissingDataSource(paths)


def stage_dump_eq(stage_cls, old_d, new_d):
# NOTE: need to remove checksums from old dict in order to compare
# it to the new one, since the new one doesn't have checksums yet.
from ..tree.local import LocalTree
from ..tree.s3 import S3Tree

old_d.pop(stage_cls.PARAM_MD5, None)
new_d.pop(stage_cls.PARAM_MD5, None)
outs = old_d.get(stage_cls.PARAM_OUTS, [])
for out in outs:
out.pop(LocalTree.PARAM_CHECKSUM, None)
out.pop(S3Tree.PARAM_CHECKSUM, None)
out.pop(HashInfo.PARAM_SIZE, None)
out.pop(HashInfo.PARAM_NFILES, None)

# outs and deps are lists of dicts. To check equality, we need to make
# them independent of the order, so, we convert them to dicts.
combination = product(
[old_d, new_d], [stage_cls.PARAM_DEPS, stage_cls.PARAM_OUTS]
)
for coll, key in combination:
if coll.get(key):
coll[key] = {item["path"]: item for item in coll[key]}
return old_d == new_d


def compute_md5(stage):
from dvc.output.base import BaseOutput

Expand Down
10 changes: 6 additions & 4 deletions tests/func/test_run_multistage.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ def test_run_multi_stage_repeat(tmp_dir, dvc, run_copy):
}


def test_multi_stage_run_cached(tmp_dir, dvc, run_copy):
def test_multi_stage_run_cached(tmp_dir, dvc, run_copy, mocker):
from dvc.stage.run import subprocess

tmp_dir.dvc_gen("foo", "foo")

run_copy("foo", "foo2", name="copy-foo1-foo2")
stage2 = run_copy("foo", "foo2", name="copy-foo1-foo2")

assert stage2 is None
spy = mocker.spy(subprocess, "Popen")
run_copy("foo", "foo2", name="copy-foo1-foo2")
assert not spy.called


def test_multistage_dump_on_non_cached_outputs(tmp_dir, dvc):
Expand Down
87 changes: 22 additions & 65 deletions tests/func/test_run_single_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,32 +520,6 @@ def test(self):

time.sleep(1)

ret = main(
[
"run",
"-d",
self.FOO,
"-d",
self.CODE,
"-o",
"out",
"--file",
"out.dvc",
"--single-stage",
"python",
self.CODE,
self.FOO,
"out",
]
)
self.assertEqual(ret, 0)

# NOTE: check that dvcfile was NOT overwritten
self.assertEqual(stage_mtime, os.path.getmtime("out.dvc"))
stage_mtime = os.path.getmtime("out.dvc")

time.sleep(1)

ret = main(
[
"run",
Expand Down Expand Up @@ -660,20 +634,34 @@ def test_fname_changes_path_and_wdir(self):
self.assertEqual(d[Stage.PARAM_WDIR], "..")


def test_rerun_deterministic(tmp_dir, run_copy):
def test_rerun_deterministic(tmp_dir, run_copy, mocker):
from dvc.stage.run import subprocess

tmp_dir.gen("foo", "foo content")

assert run_copy("foo", "out", single_stage=True) is not None
assert run_copy("foo", "out", single_stage=True) is None
spy = mocker.spy(subprocess, "Popen")

run_copy("foo", "out", single_stage=True)
assert spy.called

spy.reset_mock()
run_copy("foo", "out", single_stage=True)
assert not spy.called


def test_rerun_deterministic_ignore_cache(tmp_dir, run_copy, mocker):
from dvc.stage.run import subprocess

def test_rerun_deterministic_ignore_cache(tmp_dir, run_copy):
tmp_dir.gen("foo", "foo content")

assert run_copy("foo", "out", single_stage=True) is not None
assert (
run_copy("foo", "out", run_cache=False, single_stage=True) is not None
)
spy = mocker.spy(subprocess, "Popen")

run_copy("foo", "out", single_stage=True)
assert spy.called

spy.reset_mock()
run_copy("foo", "out", run_cache=False, single_stage=True)
assert spy.called


def test_rerun_callback(dvc):
Expand Down Expand Up @@ -936,37 +924,6 @@ def test(self):
mock_checkout.assert_not_called()


class TestPersistentOutput(TestDvc):
def test_ignore_run_cache(self):
warning = "Build cache is ignored when persisting outputs."

with open("immutable", "w") as fobj:
fobj.write("1")

cmd = [
"run",
"--force",
"--single-stage",
"--deps",
"immutable",
"--outs-persist",
"greetings",
"echo hello>>greetings",
]

with self._caplog.at_level(logging.WARNING, logger="dvc"):
assert main(cmd) == 0
assert warning not in self._caplog.text

assert main(cmd) == 0
assert warning in self._caplog.text

# Even if the "immutable" dependency didn't change
# it should run the command again, as it is "ignoring build cache"
with open("greetings") as fobj:
assert "hello\nhello\n" == fobj.read()


def test_bad_stage_fname(tmp_dir, dvc, run_copy):
tmp_dir.dvc_gen("foo", "foo content")

Expand Down

0 comments on commit 8932e0b

Please sign in to comment.