Skip to content

Commit

Permalink
dvc.objects: extend diskcache to handle pickle errors (iterative#7734)
Browse files Browse the repository at this point in the history
dvc.objects: extend diskcache to handle pickle errors
  • Loading branch information
skshetry authored May 12, 2022
1 parent ee29888 commit b3fb0df
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 94 deletions.
60 changes: 39 additions & 21 deletions dvc/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,43 @@ def parse_args(argv=None):
return args


def _log_exceptions(exc: Exception):
"""Try to log some known exceptions, that are not DVCExceptions."""
from dvc.objects.cache import DiskError
from dvc.utils import error_link

if isinstance(exc, DiskError):
from dvc.utils import relpath

directory = relpath(exc.directory)
logger.exception(
f"Could not open pickled '{exc.type}' cache.\n"
f"Remove the '{directory}' directory and then retry this command."
f"\nSee {error_link('pickle')} for more information.",
extra={"tb_only": True},
)
return

if isinstance(exc, OSError):
import errno

if exc.errno == errno.EMFILE:
logger.exception(
"too many open files, please visit "
"{} to see how to handle this "
"problem".format(error_link("many-files")),
extra={"tb_only": True},
)
return

from dvc.info import get_dvc_info
from dvc.logger import FOOTER

logger.exception("unexpected error")
logger.debug("Version info for developers:\n%s", get_dvc_info())
logger.info(FOOTER)


def main(argv=None): # noqa: C901
"""Main entry point for dvc CLI.
Expand All @@ -48,7 +85,7 @@ def main(argv=None): # noqa: C901
from dvc._debug import debugtools
from dvc.config import ConfigError
from dvc.exceptions import DvcException, NotDvcRepoError
from dvc.logger import FOOTER, disable_other_loggers
from dvc.logger import disable_other_loggers

# NOTE: stderr/stdout may be closed if we are running from dvc.daemon.
# On Linux we directly call cli.main after double forking and closing
Expand Down Expand Up @@ -104,26 +141,7 @@ def main(argv=None): # noqa: C901
ret = 254
except Exception as exc: # noqa, pylint: disable=broad-except
# pylint: disable=no-member
import errno

if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
from dvc.utils import error_link

logger.exception(
"too many open files, please visit "
"{} to see how to handle this "
"problem".format(error_link("many-files")),
extra={"tb_only": True},
)
else:
from dvc.info import get_dvc_info

logger.exception("unexpected error")

dvc_info = get_dvc_info()
logger.debug("Version info for developers:\n%s", dvc_info)

logger.info(FOOTER)
_log_exceptions(exc)
ret = 255

try:
Expand Down
23 changes: 6 additions & 17 deletions dvc/data/db/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import TYPE_CHECKING, Iterable, Set

from dvc.objects.errors import ObjectDBError
from dvc.utils.decorators import with_diskcache

if TYPE_CHECKING:
from dvc.types import StrPath
Expand Down Expand Up @@ -89,49 +88,40 @@ def __init__(
tmp_dir: "StrPath",
name: str,
): # pylint: disable=super-init-not-called
from diskcache import Cache, Index

from dvc.fs.local import LocalFileSystem
from dvc.objects.cache import Cache, Index
from dvc.utils.fs import makedirs

self.index_dir = os.path.join(tmp_dir, self.INDEX_DIR, name)
makedirs(self.index_dir, exist_ok=True)
self.fs = LocalFileSystem()
self.index = Index.fromcache(
Cache(
self.index_dir,
disk_pickle_protocol=4,
eviction_policy="none",
)
cache = Cache(
self.index_dir, eviction_policy="none", disk_type="index"
)
self.index = Index.fromcache(cache)

@with_diskcache(name="index")
def __iter__(self):
return iter(self.index)

@with_diskcache(name="index")
def __contains__(self, hash_):
return hash_ in self.index

@with_diskcache(name="index")
def dir_hashes(self):
"""Iterate over .dir hashes stored in the index."""
yield from (hash_ for hash_, is_dir in self.index.items() if is_dir)

@with_diskcache(name="index")
def clear(self):
"""Clear this index (to force re-indexing later)."""
from diskcache import Timeout
from dvc.objects.cache import Timeout

try:
self.index.clear()
except Timeout as exc:
raise ObjectDBError("Failed to clear ODB index") from exc

@with_diskcache(name="index")
def update(self, dir_hashes: Iterable[str], file_hashes: Iterable[str]):
"""Update this index, adding the specified hashes."""
from diskcache import Timeout
from dvc.objects.cache import Timeout

try:
with self.index.transact():
Expand All @@ -143,7 +133,6 @@ def update(self, dir_hashes: Iterable[str], file_hashes: Iterable[str]):
except Timeout as exc:
raise ObjectDBError("Failed to update ODB index") from exc

@with_diskcache(name="index")
def intersection(self, hashes: Set[str]):
"""Iterate over values from `hashes` which exist in the index."""
yield from hashes.intersection(self.index.keys())
61 changes: 61 additions & 0 deletions dvc/objects/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import pickle
from functools import wraps
from typing import Any

import diskcache
from diskcache import Disk as disk
from diskcache import Index # noqa: F401, pylint: disable=unused-import
from diskcache import Timeout # noqa: F401, pylint: disable=unused-import

# pylint: disable=redefined-builtin


class DiskError(Exception):
def __init__(self, directory: str, type: str) -> None:
self.directory = directory
self.type = type
super().__init__(f"Could not open disk '{type}' in {directory}")


def translate_pickle_error(fn):
@wraps(fn)
def wrapped(self, *args, **kwargs):
try:
return fn(self, *args, **kwargs)
except (pickle.PickleError, ValueError) as e:
if isinstance(e, ValueError) and "pickle protocol" not in str(e):
raise
# pylint: disable=protected-access
raise DiskError(self._directory, type=self._type) from e

return wrapped


class Disk(disk):
"""Reraise pickle-related errors as DiskError."""

def __init__(self, *args, type=None, **kwargs):
super().__init__(*args, **kwargs)
self._type = type or os.path.basename(self._directory)

put = translate_pickle_error(disk.put)
get = translate_pickle_error(disk.get)
store = translate_pickle_error(disk.store)
fetch = translate_pickle_error(disk.fetch)


class Cache(diskcache.Cache):
"""Extended to handle pickle errors and use a constant pickle protocol."""

def __init__(
self,
directory: str = None,
timeout: int = 60,
disk: disk = Disk, # pylint: disable=redefined-outer-name
**settings: Any,
) -> None:
settings.setdefault("disk_pickle_protocol", 4)
super().__init__(
directory=directory, timeout=timeout, disk=Disk, **settings
)
13 changes: 2 additions & 11 deletions dvc/objects/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from dvc.fs.local import LocalFileSystem
from dvc.fs.system import inode as get_inode
from dvc.utils import relpath
from dvc.utils.decorators import with_diskcache

from .hash_info import HashInfo
from .utils import get_mtime_and_size
Expand Down Expand Up @@ -49,7 +48,7 @@ def save_link(self, path, fs):

class State(StateBase): # pylint: disable=too-many-instance-attributes
def __init__(self, root_dir=None, tmp_dir=None, dvcignore=None):
from diskcache import Cache
from .cache import Cache

super().__init__()

Expand All @@ -60,18 +59,14 @@ def __init__(self, root_dir=None, tmp_dir=None, dvcignore=None):
if not tmp_dir:
return

config = {
"eviction_policy": "least-recently-used",
"disk_pickle_protocol": 4,
}
config = {"eviction_policy": "least-recently-used"}
self.links = Cache(directory=os.path.join(tmp_dir, "links"), **config)
self.md5s = Cache(directory=os.path.join(tmp_dir, "md5s"), **config)

def close(self):
self.md5s.close()
self.links.close()

@with_diskcache(name="md5s")
def save(self, path, fs, hash_info):
"""Save hash for the specified path info.
Expand All @@ -96,7 +91,6 @@ def save(self, path, fs, hash_info):

self.md5s[inode] = (mtime, str(size), hash_info.value)

@with_diskcache(name="md5s")
def get(self, path, fs):
"""Gets the hash for the specified path info. Hash will be
retrieved from the state database if available.
Expand Down Expand Up @@ -127,7 +121,6 @@ def get(self, path, fs):

return Meta(size=size), HashInfo("md5", value[2])

@with_diskcache(name="links")
def save_link(self, path, fs):
"""Adds the specified path to the list of links created by dvc. This
list is later used on `dvc checkout` to cleanup old links.
Expand All @@ -149,7 +142,6 @@ def save_link(self, path, fs):
with self.links as ref:
ref[relative_path] = (inode, mtime)

@with_diskcache(name="links")
def get_unused_links(self, used, fs):
"""Removes all saved links except the ones that are used.
Expand Down Expand Up @@ -177,7 +169,6 @@ def get_unused_links(self, used, fs):

return unused

@with_diskcache(name="links")
def remove_links(self, unused, fs):
if not isinstance(fs, LocalFileSystem):
return
Expand Down
30 changes: 0 additions & 30 deletions dvc/utils/decorators.py

This file was deleted.

Empty file added tests/unit/cli/__init__.py
Empty file.
26 changes: 26 additions & 0 deletions tests/unit/cli/test_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from argparse import Namespace

from funcy import raiser

from dvc.cli import main
from dvc.objects.cache import DiskError


def test_state_pickle_errors_are_correctly_raised(tmp_dir, caplog, mocker):
path = tmp_dir / "dir" / "test"
mocker.patch(
"dvc.cli.parse_args",
return_value=Namespace(
func=raiser(DiskError(path, "md5s")),
quiet=False,
verbose=True,
),
)

assert main() == 255
assert (
f"Could not open pickled 'md5s' cache.\n"
f"Remove the '{path.relative_to(tmp_dir)}' directory "
"and then retry this command.\n"
"See <https://error.dvc.org/pickle> for more information."
) in caplog.text
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import pickle
from typing import Any

import diskcache
import pytest

from dvc.exceptions import DvcException
from dvc.utils.decorators import with_diskcache
from dvc.objects.cache import Cache, DiskError


@with_diskcache(name="test")
def set_value(cache: diskcache.Cache, key: str, value: Any) -> Any:
def set_value(cache: Cache, key: str, value: Any) -> Any:
cache[key] = value
return cache[key]


def test_pickle_protocol_error(tmp_dir):
with pytest.raises(DvcException) as exc:
with diskcache.Cache(
directory=(tmp_dir / "test"),
disk_pickle_protocol=pickle.HIGHEST_PROTOCOL + 1,
) as cache:
set_value(cache, "key", ("value1", "value2"))
assert "troubleshooting#pickle" in str(exc)
@pytest.mark.parametrize("disk_type", [None, "test"])
def test_pickle_protocol_error(tmp_dir, disk_type):
directory = tmp_dir / "test"
cache = Cache(
directory,
disk_pickle_protocol=pickle.HIGHEST_PROTOCOL + 1,
disk_type=disk_type,
)
with pytest.raises(DiskError) as exc, cache as cache:
set_value(cache, "key", ("value1", "value2"))
assert exc.value.directory == str(directory)
assert exc.value.type == "test"
assert f"Could not open disk 'test' in {directory}" == str(exc.value)


@pytest.mark.parametrize(
Expand All @@ -32,12 +34,12 @@ def test_pickle_protocol_error(tmp_dir):
],
)
def test_pickle_backwards_compat(tmp_dir, proto_a, proto_b):
with diskcache.Cache(
with Cache(
directory=(tmp_dir / "test"),
disk_pickle_protocol=proto_a,
) as cache:
set_value(cache, "key", ("value1", "value2"))
with diskcache.Cache(
with Cache(
directory=(tmp_dir / "test"),
disk_pickle_protocol=proto_b,
) as cache:
Expand Down

0 comments on commit b3fb0df

Please sign in to comment.