Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvc: release locks when running a command #2584

Merged
merged 1 commit into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions dvc/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
from datetime import timedelta

import zc.lockfile
from funcy.py3 import lkeep

from dvc.exceptions import DvcException
from dvc.utils import makedirs, format_link
from dvc.utils import format_link
from dvc.utils.compat import is_py3, is_py2
from dvc.progress import Tqdm

Expand Down Expand Up @@ -93,8 +92,6 @@ def __init__(self, lockfile, tmp_dir=None, **kwargs):
import socket

self._tmp_dir = tmp_dir
if self._tmp_dir is not None:
makedirs(self._tmp_dir, exist_ok=True)

# NOTE: this is basically Lock.__init__ copy-paste, except that
# instead of using `socket.getfqdn()` we use `socket.gethostname()`
Expand All @@ -120,10 +117,6 @@ def __init__(self, lockfile, tmp_dir=None, **kwargs):
def lockfile(self):
return self._lockfile

@property
def files(self):
return lkeep([self._lockfile, self._tmp_dir])

def lock(self):
try:
super(Lock, self).lock(timedelta(seconds=DEFAULT_TIMEOUT))
Expand Down
13 changes: 11 additions & 2 deletions dvc/path_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@
fs_encoding = "utf-8"


class PathInfo(pathlib.PurePath):
class _BasePath(object):
def overlaps(self, other):
if isinstance(other, basestring):
other = self.__class__(other)
elif self.__class__ != other.__class__:
return False
return self == other or self.isin(other) or other.isin(self)
Suor marked this conversation as resolved.
Show resolved Hide resolved


class PathInfo(pathlib.PurePath, _BasePath):
# Use __slots__ in PathInfo objects following PurePath implementation.
# This makes objects smaller and speeds up attribute access.
# We don't add any fields so it's empty.
Expand Down Expand Up @@ -138,7 +147,7 @@ def __repr__(self):
return "<{}.parents>".format(self.src)


class URLInfo(object):
class URLInfo(_BasePath):
DEFAULT_PORTS = {"http": 80, "https": 443, "ssh": 22, "hdfs": 0}

def __init__(self, url):
Expand Down
14 changes: 7 additions & 7 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(self, root_dir=None):
from dvc.repo.metrics import Metrics
from dvc.scm.tree import WorkingTree
from dvc.repo.tag import Tag
from dvc.utils import makedirs

root_dir = self.find_root(root_dir)

Expand All @@ -88,13 +89,17 @@ def __init__(self, root_dir=None):

self.tree = WorkingTree(self.root_dir)

self.tmp_dir = os.path.join(self.dvc_dir, "tmp")
makedirs(self.tmp_dir, exist_ok=True)

hardlink_lock = self.config.config["core"].get("hardlink_lock", False)
self.lock = make_lock(
os.path.join(self.dvc_dir, "lock"),
tmp_dir=os.path.join(self.dvc_dir, "tmp"),
hardlink_lock=hardlink_lock,
friendly=True,
)

# NOTE: storing state and link_state in the repository itself to avoid
# any possible state corruption in 'shared cache dir' scenario.
self.state = State(self, self.config.config)
Expand Down Expand Up @@ -165,9 +170,8 @@ def _ignore(self):

flist = (
[self.config.config_local_file, updater.updater_file]
+ [self.lock.lockfile, updater.lock.lockfile, self.tmp_dir]
+ self.state.files
+ self.lock.files
+ updater.lock.files
)

if path_isin(self.cache.local.cache_dir, self.root_dir):
Expand Down Expand Up @@ -352,11 +356,7 @@ def _collect_graph(self, stages=None):
continue

for out in outs:
if (
out == dep.path_info
or dep.path_info.isin(out)
or out.isin(dep.path_info)
):
if out.overlaps(dep.path_info):
dep_stage = outs[out].stage
dep_node = relpath(dep_stage.path, self.root_dir)
G.add_node(dep_node, stage=dep_stage)
Expand Down
191 changes: 191 additions & 0 deletions dvc/rwlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
from __future__ import unicode_literals

import os
import json

from collections import defaultdict
from contextlib import contextmanager

from voluptuous import Schema, Invalid
from funcy.py3 import lmap, lfilter, lmapcat

from .exceptions import DvcException
from .lock import LockError
from .utils.compat import (
convert_to_unicode,
FileNotFoundError,
JSONDecodeError,
str,
)
from .utils.fs import relpath

INFO_SCHEMA = {"pid": int, "cmd": str}

SCHEMA = Schema({"write": {str: INFO_SCHEMA}, "read": {str: [INFO_SCHEMA]}})


class RWLockFileCorruptedError(DvcException):
def __init__(self, path, cause):
super(RWLockFileCorruptedError, self).__init__(
"Unable to read RWLock-file '{}'. JSON structure is "
"corrupted".format(relpath(path)),
cause=cause,
)


class RWLockFileFormatError(DvcException):
def __init__(self, path, cause):
super(RWLockFileFormatError, self).__init__(
"RWLock-file '{}' format error.".format(relpath(path)), cause=cause
)


@contextmanager
def _edit_rwlock(lock_dir):
path = os.path.join(lock_dir, "rwlock")
try:
with open(path, "r") as fobj:
lock = json.load(fobj)
lock = SCHEMA(convert_to_unicode(lock))
except FileNotFoundError:
lock = SCHEMA({})
except JSONDecodeError as exc:
raise RWLockFileCorruptedError(path, cause=exc)
except Invalid as exc:
raise RWLockFileFormatError(path, cause=exc)
lock = defaultdict(dict, lock)
lock["read"] = defaultdict(list, lock["read"])
lock["write"] = defaultdict(dict, lock["write"])
yield lock
with open(path, "w+") as fobj:
json.dump(lock, fobj)


def _infos_to_str(infos):
return "\n".join(
" (PID {}): {}".format(info["pid"], info["cmd"]) for info in infos
)


def _check_no_writers(lock, info, path_infos):
for path_info in path_infos:
blocking_urls = lfilter(path_info.overlaps, lock["write"])
if not blocking_urls:
continue

writers = lmap(lock["write"].get, blocking_urls)
writers = lfilter(lambda i: info != i, writers)
if not writers:
continue

raise LockError(
"'{}' is busy, it is being written to by:\n{}".format(
str(path_info), _infos_to_str(writers)
)
)


def _check_no_readers(lock, info, path_infos):
for path_info in path_infos:
blocking_urls = lfilter(path_info.overlaps, lock["read"])
if not blocking_urls:
continue

readers = lmapcat(lock["read"].get, blocking_urls)
readers = lfilter(lambda i: info != i, readers)
if not readers:
continue

raise LockError(
"'{}' is busy, it is being read by:\n{}".format(
str(path_info), _infos_to_str(readers)
)
)


def _acquire_read(lock, info, path_infos):
changes = []

for path_info in path_infos:
url = path_info.url

readers = lock["read"][url]
if info in readers:
continue

changes.append(url)
readers.append(info)

return changes


def _acquire_write(lock, info, path_infos):
changes = []

for path_info in path_infos:
url = path_info.url

if lock["write"][url] == info:
continue

changes.append(url)
lock["write"][url] = info

return changes


def _release_write(lock, info, changes):
for url in changes:
assert "write" in lock
assert url in lock["write"]
assert lock["write"][url] == info
del lock["write"][url]
if not lock["write"]:
del lock["write"]
efiop marked this conversation as resolved.
Show resolved Hide resolved


def _release_read(lock, info, changes):
for url in changes:
assert "read" in lock
assert url in lock["read"]
assert info in lock["read"][url]
lock["read"][url].remove(info)
if not lock["read"][url]:
del lock["read"][url]
if not lock["read"]:
del lock["read"]


@contextmanager
def rwlock(tmp_dir, cmd, read, write):
"""Create non-thread-safe RWLock for PathInfos.

Args:
tmp_dir (str): existing directory where to create the rwlock file.
cmd (str): command that will be working on these PathInfos.
read ([PathInfo]): PathInfos that are going to be read.
write ([PathInfo]): PathInfos that are going to be written.

Raises:
LockError: raised if PathInfo we want to read is being written to by
another command or if PathInfo we want to write is being written
to or read from by another command.
RWLockFileCorruptedError: raised if rwlock file is not a valid JSON.
RWLockFileFormatError: raised if rwlock file is a valid JSON, but
has internal format that doesn't pass our schema validation.
"""
info = {"pid": os.getpid(), "cmd": cmd}

with _edit_rwlock(tmp_dir) as lock:
_check_no_writers(lock, info, read + write)
_check_no_readers(lock, info, write)

rchanges = _acquire_read(lock, info, read)
wchanges = _acquire_write(lock, info, write)

try:
yield
finally:
with _edit_rwlock(tmp_dir) as lock:
_release_write(lock, info, wchanges)
_release_read(lock, info, rchanges)
efiop marked this conversation as resolved.
Show resolved Hide resolved
Loading