Skip to content

Commit

Permalink
dvc.data: save and try loading raw dir objects
Browse files Browse the repository at this point in the history
When staging a directory, always save a "raw dir object" to the odb.
If the corresponding ".dir" object has not been added to the odb,
`stage()` calls can load the tree from the raw dir object instead of
rebuilding it by walking the directory.

This can lead to significant speed improvements when calling
`dvc status` for modified directories.

Fixes iterative#7390
  • Loading branch information
dtrifiro authored and efiop committed May 12, 2022
1 parent b3fb0df commit 7a795f7
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 21 deletions.
105 changes: 89 additions & 16 deletions dvc/data/stage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import hashlib
import logging
import os
Expand All @@ -19,6 +20,9 @@
from dvc.fs.base import AnyFSPath, FileSystem
from dvc.objects.db import ObjectDB

from .tree import Tree


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -233,7 +237,7 @@ def _make_staging_url(
return url


def _get_staging(odb: "ObjectDB") -> "ObjectDB":
def _get_staging(odb: "ObjectDB") -> "ReferenceObjectDB":
"""Return an ODB that can be used for staging objects.
Staging will be a reference ODB stored in the the global memfs.
Expand All @@ -247,27 +251,90 @@ def _get_staging(odb: "ObjectDB") -> "ObjectDB":
return ReferenceObjectDB(fs, fs_path, state=state)


def _load_from_state(odb, staging, fs_path, fs, name):
def _load_raw_dir_obj(odb: "ObjectDB", hash_info: "HashInfo") -> "Tree":
from dvc.objects.errors import ObjectFormatError

from .tree import Tree

try:
tree = Tree.load(odb, hash_info.as_raw())
tree.check(odb)
tree.hash_info = hash_info
except ObjectFormatError as exc:
raise FileNotFoundError(
errno.ENOENT,
"No such object",
odb.hash_to_path(hash_info.as_raw().value),
) from exc

return tree


def _load_from_state(
odb: "ObjectDB",
staging: "ReferenceObjectDB",
fs_path: "AnyFSPath",
fs: "FileSystem",
name: str,
dry_run: bool,
) -> Tuple["ObjectDB", "Meta", "HashFile"]:
from dvc.objects.errors import ObjectFormatError

from . import check, load
from .tree import Tree

state = odb.state
meta, hash_info = state.get(fs_path, fs)
if hash_info:
for odb_ in (odb, staging):
if odb_.exists(hash_info):
try:
obj = load(odb_, hash_info)
check(odb_, obj, check_hash=False)
if isinstance(obj, Tree):
meta.nfiles = len(obj)
assert obj.hash_info.name == name
return odb_, meta, obj
except (ObjectFormatError, FileNotFoundError):
pass
raise FileNotFoundError
if not hash_info:
raise FileNotFoundError

for odb_ in (odb, staging):
if not odb_.exists(hash_info):
continue

try:
obj = load(odb, hash_info)
check(odb, obj, check_hash=False)
except (ObjectFormatError, FileNotFoundError):
continue

if isinstance(obj, Tree):
meta.nfiles = len(obj)
assert obj.hash_info.name == name
return odb_, meta, obj

if not hash_info.isdir:
raise FileNotFoundError

# Try loading the raw dir object saved by `stage`, see below and #7390
tree = _load_raw_dir_obj(odb, hash_info)
meta.nfiles = len(tree)
assert tree.hash_info.name == name

if not dry_run:
assert tree.fs
for key, _, oid in tree:
staging.add(
fs.path.join(fs_path, *key),
fs,
oid,
hardlink=False,
verify=False,
)

staging.add(
tree.fs_path,
tree.fs,
hash_info,
hardlink=False,
)

raw = staging.get(hash_info)
tree.fs = raw.fs
tree.fs_path = raw.fs_path

logger.debug("loaded tree '%s' from raw dir obj", tree)
return staging, meta, tree


def _stage_external_tree_info(odb, tree, name):
Expand Down Expand Up @@ -318,7 +385,7 @@ def stage(
staging = _get_staging(odb)
if odb:
try:
return _load_from_state(odb, staging, fs_path, fs, name)
return _load_from_state(odb, staging, fs_path, fs, name, dry_run)
except FileNotFoundError:
pass

Expand All @@ -336,6 +403,12 @@ def stage(
logger.debug("staged tree '%s'", obj)
if name != "md5":
obj = _stage_external_tree_info(odb, obj, name)

# In order to avoid re-building the tree when it is not committed to
# the local odb (e.g. for a status call), we save it as a raw object.
# Loading this instead of building the tree can speed up `dvc status`
# for modified directories, see #7390
odb.add(obj.fs_path, obj.fs, obj.hash_info.as_raw())
else:
_, meta, obj = _stage_file(
fs_path,
Expand Down
2 changes: 1 addition & 1 deletion dvc/data/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def from_list(cls, lst):
return tree

@classmethod
def load(cls, odb, hash_info):
def load(cls, odb, hash_info) -> "Tree":
obj = odb.get(hash_info)

try:
Expand Down
6 changes: 6 additions & 0 deletions dvc/objects/hash_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ def isdir(self):
if not self:
return False
return self.value.endswith(HASH_DIR_SUFFIX)

def as_raw(self) -> "HashInfo":
assert self.value
return HashInfo(
self.name, self.value.rsplit(HASH_DIR_SUFFIX)[0], self.obj_name
)
9 changes: 8 additions & 1 deletion tests/func/test_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,22 @@ def test_commit_granular_dir(tmp_dir, dvc):

cache = tmp_dir / ".dvc" / "cache"

assert set(cache.glob("*/*")) == set()
assert set(cache.glob("*/*")) == {
cache / "1a" / "ca2c799df82929bbdd976557975546",
}

dvc.commit(os.path.join("data", "foo"))
assert set(cache.glob("*/*")) == {
cache / "1a" / "ca2c799df82929bbdd976557975546",
cache / "1a" / "ca2c799df82929bbdd976557975546.dir",
cache / "ac" / "bd18db4cc2f85cedef654fccc4a4d8",
}
clean_staging()

dvc.commit(os.path.join("data", "subdir"))
assert set(cache.glob("*/*")) == {
cache / "26" / "d6b64d96a660707412f523e8184b5f",
cache / "1a" / "ca2c799df82929bbdd976557975546",
cache / "1a" / "ca2c799df82929bbdd976557975546.dir",
cache / "ac" / "bd18db4cc2f85cedef654fccc4a4d8",
cache / "4c" / "e8d2a2cf314a52fa7f315ca37ca445",
Expand All @@ -197,6 +202,8 @@ def test_commit_granular_dir(tmp_dir, dvc):

dvc.commit(os.path.join("data"))
assert set(cache.glob("*/*")) == {
cache / "26" / "d6b64d96a660707412f523e8184b5f",
cache / "1a" / "ca2c799df82929bbdd976557975546",
cache / "1a" / "ca2c799df82929bbdd976557975546.dir",
cache / "ac" / "bd18db4cc2f85cedef654fccc4a4d8",
cache / "4c" / "e8d2a2cf314a52fa7f315ca37ca445",
Expand Down
1 change: 1 addition & 0 deletions tests/func/test_external_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def test_subrepos_are_ignored(tmp_dir, erepo_dir):
hardlink=True,
)
assert set(cache_dir.glob("??/*")) == {
cache_dir / "e1" / "d9e8eae5374860ae025ec84cfd85c7",
cache_dir / "e1" / "d9e8eae5374860ae025ec84cfd85c7.dir",
cache_dir / "37" / "b51d194a7513e45b56f6524f2d51f2",
cache_dir / "94" / "7d2b84e5aa88170e80dff467a5bfb6",
Expand Down
9 changes: 6 additions & 3 deletions tests/func/test_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ def setUp(self):
super().setUp()

self.dvc.add(self.FOO)
self.dvc.add(self.DATA_DIR)
stages = self.dvc.add(self.DATA_DIR)
raw_dir_hash = stages[0].outs[0].hash_info.as_raw().value

self.good_cache = [
self.dvc.odb.local.hash_to_path(md5)
for md5 in self.dvc.odb.local.all()
if md5 != raw_dir_hash
]

self.bad_cache = []
self.bad_cache = [self.dvc.odb.local.hash_to_path(raw_dir_hash)]
for i in ["123", "234", "345"]:
path = os.path.join(self.dvc.odb.local.cache_dir, i[0:2], i[2:])
self.create(path, i)
Expand Down Expand Up @@ -203,7 +206,7 @@ def test_gc_no_dir_cache(tmp_dir, dvc):
with pytest.raises(CollectCacheError):
dvc.gc(workspace=True)

assert _count_files(dvc.odb.local.cache_dir) == 4
assert _count_files(dvc.odb.local.cache_dir) == 5
dvc.gc(force=True, workspace=True)
assert _count_files(dvc.odb.local.cache_dir) == 2

Expand Down
3 changes: 3 additions & 0 deletions tests/func/test_odb.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def test_shared_cache(tmp_dir, dvc, group):

expected = {
os.path.join(cache_dir, "17"): dir_mode,
os.path.join(
cache_dir, "17", "4eaa1dd94050255b7b98a7e1924b31"
): file_mode,
os.path.join(
cache_dir, "17", "4eaa1dd94050255b7b98a7e1924b31.dir"
): file_mode,
Expand Down
3 changes: 3 additions & 0 deletions tests/func/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ def test_dir_hash_should_be_key_order_agnostic(tmp_dir, dvc):
_, _, obj = stage(dvc.odb.local, path, dvc.odb.local.fs, "md5")
hash1 = obj.hash_info

# remove the raw dir obj to force building the tree on the next stage call
dvc.odb.local.fs.remove(dvc.odb.local.hash_to_path(hash1.as_raw().value))

tree = Tree.from_list(
[{"md5": "1", "relpath": "1"}, {"md5": "2", "relpath": "2"}]
)
Expand Down
60 changes: 60 additions & 0 deletions tests/func/test_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dvc.utils.serialize import dump_yaml, load_yaml
from dvc.utils.strictyaml import YAMLValidationError
from tests.basic_env import TestDvc
from tests.utils import clean_staging


def test_cmd_obj():
Expand Down Expand Up @@ -321,3 +322,62 @@ def test_stage_run_checkpoint(tmp_dir, dvc, mocker, checkpoint):
mock_cmd_run.assert_called_with(
stage, checkpoint_func=callback, dry=False, run_env=None
)


@pytest.mark.parametrize(
"dry_run, expected_staging_contents",
[
(True, set()),
(
False,
{
"37b51d194a7513e45b56f6524f2d51f2",
"568f3dd88592a68ef99459a5491011cd",
"68dde2c3c4e7953c2290f176bbdc9a54",
"fd4034d9514d6e875538422c8b0dbeb2.dir",
},
),
],
)
def test_stage_dir_optimization(
tmp_dir, dvc, mocker, dry_run, expected_staging_contents
):
from dvc.data import stage
from dvc.data.tree import Tree

tmp_dir.dvc_gen(
{
"data": {
"foo": "bar",
"subdir": {"subfoo": "subbar"},
}
}
)
odb = dvc.odb.local

objs = set(odb.all())
clean_staging()

tmp_dir.gen({"data": {"baz": "quz"}})

stage_spy = mocker.spy(stage, "_stage_tree")
_, _, tree = stage.stage(odb, "data", odb.fs, odb.fs.PARAM_CHECKSUM)

assert stage_spy.called
assert set(odb.all()) - objs == {tree.hash_info.as_raw().value}
stage_spy.reset_mock()
clean_staging()

load_spy = mocker.spy(Tree, "load")
build_tree_spy = mocker.spy(stage, "_build_tree")

staging, _, tree = stage.stage(
odb, "data", odb.fs, odb.fs.PARAM_CHECKSUM, dry_run=dry_run
)
assert not stage_spy.called
assert not build_tree_spy.called

load_args, _ = load_spy.call_args
assert load_args[1].value == tree.hash_info.as_raw().value

assert set(staging.all()) == expected_staging_contents
17 changes: 17 additions & 0 deletions tests/unit/test_hashinfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dvc.objects.hash_info import HashInfo


def test_as_raw():
hash_info = HashInfo(
"md5", "a1d0c6e83f027327d8461063f4ac58a6.dir", "objname"
)

raw = hash_info.as_raw()

assert hash_info.name == "md5"
assert hash_info.value == "a1d0c6e83f027327d8461063f4ac58a6.dir"
assert hash_info.obj_name == "objname"

assert raw.name == "md5"
assert raw.value == "a1d0c6e83f027327d8461063f4ac58a6"
assert raw.obj_name == "objname"

0 comments on commit 7a795f7

Please sign in to comment.