Skip to content

Commit

Permalink
Merge pull request #2584 from efiop/755
Browse files Browse the repository at this point in the history
dvc: release locks when running a command
  • Loading branch information
efiop authored Dec 13, 2019
2 parents ddfa46a + e384685 commit 8d7f20f
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 25 deletions.
9 changes: 1 addition & 8 deletions dvc/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
from datetime import timedelta

import zc.lockfile
from funcy.py3 import lkeep

from dvc.exceptions import DvcException
from dvc.utils import makedirs, format_link
from dvc.utils import format_link
from dvc.utils.compat import is_py3, is_py2
from dvc.progress import Tqdm

Expand Down Expand Up @@ -93,8 +92,6 @@ def __init__(self, lockfile, tmp_dir=None, **kwargs):
import socket

self._tmp_dir = tmp_dir
if self._tmp_dir is not None:
makedirs(self._tmp_dir, exist_ok=True)

# NOTE: this is basically Lock.__init__ copy-paste, except that
# instead of using `socket.getfqdn()` we use `socket.gethostname()`
Expand All @@ -120,10 +117,6 @@ def __init__(self, lockfile, tmp_dir=None, **kwargs):
def lockfile(self):
return self._lockfile

@property
def files(self):
return lkeep([self._lockfile, self._tmp_dir])

def lock(self):
try:
super(Lock, self).lock(timedelta(seconds=DEFAULT_TIMEOUT))
Expand Down
13 changes: 11 additions & 2 deletions dvc/path_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@
fs_encoding = "utf-8"


class PathInfo(pathlib.PurePath):
class _BasePath(object):
def overlaps(self, other):
if isinstance(other, basestring):
other = self.__class__(other)
elif self.__class__ != other.__class__:
return False
return self == other or self.isin(other) or other.isin(self)


class PathInfo(pathlib.PurePath, _BasePath):
# Use __slots__ in PathInfo objects following PurePath implementation.
# This makes objects smaller and speeds up attribute access.
# We don't add any fields so it's empty.
Expand Down Expand Up @@ -138,7 +147,7 @@ def __repr__(self):
return "<{}.parents>".format(self.src)


class URLInfo(object):
class URLInfo(_BasePath):
DEFAULT_PORTS = {"http": 80, "https": 443, "ssh": 22, "hdfs": 0}

def __init__(self, url):
Expand Down
14 changes: 7 additions & 7 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(self, root_dir=None):
from dvc.repo.metrics import Metrics
from dvc.scm.tree import WorkingTree
from dvc.repo.tag import Tag
from dvc.utils import makedirs

root_dir = self.find_root(root_dir)

Expand All @@ -88,13 +89,17 @@ def __init__(self, root_dir=None):

self.tree = WorkingTree(self.root_dir)

self.tmp_dir = os.path.join(self.dvc_dir, "tmp")
makedirs(self.tmp_dir, exist_ok=True)

hardlink_lock = self.config.config["core"].get("hardlink_lock", False)
self.lock = make_lock(
os.path.join(self.dvc_dir, "lock"),
tmp_dir=os.path.join(self.dvc_dir, "tmp"),
hardlink_lock=hardlink_lock,
friendly=True,
)

# NOTE: storing state and link_state in the repository itself to avoid
# any possible state corruption in 'shared cache dir' scenario.
self.state = State(self, self.config.config)
Expand Down Expand Up @@ -165,9 +170,8 @@ def _ignore(self):

flist = (
[self.config.config_local_file, updater.updater_file]
+ [self.lock.lockfile, updater.lock.lockfile, self.tmp_dir]
+ self.state.files
+ self.lock.files
+ updater.lock.files
)

if path_isin(self.cache.local.cache_dir, self.root_dir):
Expand Down Expand Up @@ -352,11 +356,7 @@ def _collect_graph(self, stages=None):
continue

for out in outs:
if (
out == dep.path_info
or dep.path_info.isin(out)
or out.isin(dep.path_info)
):
if out.overlaps(dep.path_info):
dep_stage = outs[out].stage
dep_node = relpath(dep_stage.path, self.root_dir)
G.add_node(dep_node, stage=dep_stage)
Expand Down
191 changes: 191 additions & 0 deletions dvc/rwlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
from __future__ import unicode_literals

import os
import json

from collections import defaultdict
from contextlib import contextmanager

from voluptuous import Schema, Invalid
from funcy.py3 import lmap, lfilter, lmapcat

from .exceptions import DvcException
from .lock import LockError
from .utils.compat import (
convert_to_unicode,
FileNotFoundError,
JSONDecodeError,
str,
)
from .utils.fs import relpath

INFO_SCHEMA = {"pid": int, "cmd": str}

SCHEMA = Schema({"write": {str: INFO_SCHEMA}, "read": {str: [INFO_SCHEMA]}})


class RWLockFileCorruptedError(DvcException):
def __init__(self, path, cause):
super(RWLockFileCorruptedError, self).__init__(
"Unable to read RWLock-file '{}'. JSON structure is "
"corrupted".format(relpath(path)),
cause=cause,
)


class RWLockFileFormatError(DvcException):
def __init__(self, path, cause):
super(RWLockFileFormatError, self).__init__(
"RWLock-file '{}' format error.".format(relpath(path)), cause=cause
)


@contextmanager
def _edit_rwlock(lock_dir):
path = os.path.join(lock_dir, "rwlock")
try:
with open(path, "r") as fobj:
lock = json.load(fobj)
lock = SCHEMA(convert_to_unicode(lock))
except FileNotFoundError:
lock = SCHEMA({})
except JSONDecodeError as exc:
raise RWLockFileCorruptedError(path, cause=exc)
except Invalid as exc:
raise RWLockFileFormatError(path, cause=exc)
lock = defaultdict(dict, lock)
lock["read"] = defaultdict(list, lock["read"])
lock["write"] = defaultdict(dict, lock["write"])
yield lock
with open(path, "w+") as fobj:
json.dump(lock, fobj)


def _infos_to_str(infos):
return "\n".join(
" (PID {}): {}".format(info["pid"], info["cmd"]) for info in infos
)


def _check_no_writers(lock, info, path_infos):
for path_info in path_infos:
blocking_urls = lfilter(path_info.overlaps, lock["write"])
if not blocking_urls:
continue

writers = lmap(lock["write"].get, blocking_urls)
writers = lfilter(lambda i: info != i, writers)
if not writers:
continue

raise LockError(
"'{}' is busy, it is being written to by:\n{}".format(
str(path_info), _infos_to_str(writers)
)
)


def _check_no_readers(lock, info, path_infos):
for path_info in path_infos:
blocking_urls = lfilter(path_info.overlaps, lock["read"])
if not blocking_urls:
continue

readers = lmapcat(lock["read"].get, blocking_urls)
readers = lfilter(lambda i: info != i, readers)
if not readers:
continue

raise LockError(
"'{}' is busy, it is being read by:\n{}".format(
str(path_info), _infos_to_str(readers)
)
)


def _acquire_read(lock, info, path_infos):
changes = []

for path_info in path_infos:
url = path_info.url

readers = lock["read"][url]
if info in readers:
continue

changes.append(url)
readers.append(info)

return changes


def _acquire_write(lock, info, path_infos):
changes = []

for path_info in path_infos:
url = path_info.url

if lock["write"][url] == info:
continue

changes.append(url)
lock["write"][url] = info

return changes


def _release_write(lock, info, changes):
for url in changes:
assert "write" in lock
assert url in lock["write"]
assert lock["write"][url] == info
del lock["write"][url]
if not lock["write"]:
del lock["write"]


def _release_read(lock, info, changes):
for url in changes:
assert "read" in lock
assert url in lock["read"]
assert info in lock["read"][url]
lock["read"][url].remove(info)
if not lock["read"][url]:
del lock["read"][url]
if not lock["read"]:
del lock["read"]


@contextmanager
def rwlock(tmp_dir, cmd, read, write):
"""Create non-thread-safe RWLock for PathInfos.
Args:
tmp_dir (str): existing directory where to create the rwlock file.
cmd (str): command that will be working on these PathInfos.
read ([PathInfo]): PathInfos that are going to be read.
write ([PathInfo]): PathInfos that are going to be written.
Raises:
LockError: raised if PathInfo we want to read is being written to by
another command or if PathInfo we want to write is being written
to or read from by another command.
RWLockFileCorruptedError: raised if rwlock file is not a valid JSON.
RWLockFileFormatError: raised if rwlock file is a valid JSON, but
has internal format that doesn't pass our schema validation.
"""
info = {"pid": os.getpid(), "cmd": cmd}

with _edit_rwlock(tmp_dir) as lock:
_check_no_writers(lock, info, read + write)
_check_no_readers(lock, info, write)

rchanges = _acquire_read(lock, info, read)
wchanges = _acquire_write(lock, info, write)

try:
yield
finally:
with _edit_rwlock(tmp_dir) as lock:
_release_write(lock, info, wchanges)
_release_read(lock, info, rchanges)
Loading

0 comments on commit 8d7f20f

Please sign in to comment.