From e384685715e701fb43616b31158cfeee4fbaafd5 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Thu, 28 Nov 2019 18:03:19 +0200 Subject: [PATCH] dvc: release locks when running a command Currently only one instance of dvc could run in a particular repo because of the `.dvc/lock`. That gets furstrating when you are running some long-running command and blocked from doing anything else. This PR solved that issue for `dvc run` by releasing the repo lock while running the command. To protect against conflicts, we now use rwlock, which keeps the list of readers and writers for particular paths. The process looks like this: 1) aquire read locks for deps; 2) aquire write locks for outs; 3) release repo lock; 4) run cmd; 5) aquire repo lock; 6) release write locks for outs; 7) release read locks for outs; We also take rwlocks for operations such as `import`, `add` and `checkout` to ensure that we won't hit the same files that are being used by another `dvc run` in the background. We don't aquire rwlocks for the stage file itself, as we don't really care about anyone overwriting a stage file, as there is a very little difference here between parallel runs and sequential runs that overwrite the same stage file. Next steps might be to release repo lock on `import`(when dowloading stuff) and when computing checksums or copying files. Fixes #755 --- dvc/lock.py | 9 +- dvc/path_info.py | 13 ++- dvc/repo/__init__.py | 14 +-- dvc/rwlock.py | 191 ++++++++++++++++++++++++++++++++++++++ dvc/stage.py | 59 +++++++++++- dvc/utils/compat.py | 2 + tests/func/test_run.py | 2 +- tests/unit/test_rwlock.py | 98 +++++++++++++++++++ tests/unit/test_stage.py | 10 +- 9 files changed, 373 insertions(+), 25 deletions(-) create mode 100644 dvc/rwlock.py create mode 100644 tests/unit/test_rwlock.py diff --git a/dvc/lock.py b/dvc/lock.py index b7c7560f73..a8fc08c015 100644 --- a/dvc/lock.py +++ b/dvc/lock.py @@ -7,10 +7,9 @@ from datetime import timedelta import zc.lockfile -from funcy.py3 import lkeep from dvc.exceptions import DvcException -from dvc.utils import makedirs, format_link +from dvc.utils import format_link from dvc.utils.compat import is_py3, is_py2 from dvc.progress import Tqdm @@ -93,8 +92,6 @@ def __init__(self, lockfile, tmp_dir=None, **kwargs): import socket self._tmp_dir = tmp_dir - if self._tmp_dir is not None: - makedirs(self._tmp_dir, exist_ok=True) # NOTE: this is basically Lock.__init__ copy-paste, except that # instead of using `socket.getfqdn()` we use `socket.gethostname()` @@ -120,10 +117,6 @@ def __init__(self, lockfile, tmp_dir=None, **kwargs): def lockfile(self): return self._lockfile - @property - def files(self): - return lkeep([self._lockfile, self._tmp_dir]) - def lock(self): try: super(Lock, self).lock(timedelta(seconds=DEFAULT_TIMEOUT)) diff --git a/dvc/path_info.py b/dvc/path_info.py index 8602417789..773ae7dcff 100644 --- a/dvc/path_info.py +++ b/dvc/path_info.py @@ -23,7 +23,16 @@ fs_encoding = "utf-8" -class PathInfo(pathlib.PurePath): +class _BasePath(object): + def overlaps(self, other): + if isinstance(other, basestring): + other = self.__class__(other) + elif self.__class__ != other.__class__: + return False + return self == other or self.isin(other) or other.isin(self) + + +class PathInfo(pathlib.PurePath, _BasePath): # Use __slots__ in PathInfo objects following PurePath implementation. # This makes objects smaller and speeds up attribute access. # We don't add any fields so it's empty. @@ -138,7 +147,7 @@ def __repr__(self): return "<{}.parents>".format(self.src) -class URLInfo(object): +class URLInfo(_BasePath): DEFAULT_PORTS = {"http": 80, "https": 443, "ssh": 22, "hdfs": 0} def __init__(self, url): diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 86f03ac5e9..8a0afc2d3c 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -76,6 +76,7 @@ def __init__(self, root_dir=None): from dvc.repo.metrics import Metrics from dvc.scm.tree import WorkingTree from dvc.repo.tag import Tag + from dvc.utils import makedirs root_dir = self.find_root(root_dir) @@ -88,6 +89,9 @@ def __init__(self, root_dir=None): self.tree = WorkingTree(self.root_dir) + self.tmp_dir = os.path.join(self.dvc_dir, "tmp") + makedirs(self.tmp_dir, exist_ok=True) + hardlink_lock = self.config.config["core"].get("hardlink_lock", False) self.lock = make_lock( os.path.join(self.dvc_dir, "lock"), @@ -95,6 +99,7 @@ def __init__(self, root_dir=None): hardlink_lock=hardlink_lock, friendly=True, ) + # NOTE: storing state and link_state in the repository itself to avoid # any possible state corruption in 'shared cache dir' scenario. self.state = State(self, self.config.config) @@ -165,9 +170,8 @@ def _ignore(self): flist = ( [self.config.config_local_file, updater.updater_file] + + [self.lock.lockfile, updater.lock.lockfile, self.tmp_dir] + self.state.files - + self.lock.files - + updater.lock.files ) if path_isin(self.cache.local.cache_dir, self.root_dir): @@ -352,11 +356,7 @@ def _collect_graph(self, stages=None): continue for out in outs: - if ( - out == dep.path_info - or dep.path_info.isin(out) - or out.isin(dep.path_info) - ): + if out.overlaps(dep.path_info): dep_stage = outs[out].stage dep_node = relpath(dep_stage.path, self.root_dir) G.add_node(dep_node, stage=dep_stage) diff --git a/dvc/rwlock.py b/dvc/rwlock.py new file mode 100644 index 0000000000..29b7bce88f --- /dev/null +++ b/dvc/rwlock.py @@ -0,0 +1,191 @@ +from __future__ import unicode_literals + +import os +import json + +from collections import defaultdict +from contextlib import contextmanager + +from voluptuous import Schema, Invalid +from funcy.py3 import lmap, lfilter, lmapcat + +from .exceptions import DvcException +from .lock import LockError +from .utils.compat import ( + convert_to_unicode, + FileNotFoundError, + JSONDecodeError, + str, +) +from .utils.fs import relpath + +INFO_SCHEMA = {"pid": int, "cmd": str} + +SCHEMA = Schema({"write": {str: INFO_SCHEMA}, "read": {str: [INFO_SCHEMA]}}) + + +class RWLockFileCorruptedError(DvcException): + def __init__(self, path, cause): + super(RWLockFileCorruptedError, self).__init__( + "Unable to read RWLock-file '{}'. JSON structure is " + "corrupted".format(relpath(path)), + cause=cause, + ) + + +class RWLockFileFormatError(DvcException): + def __init__(self, path, cause): + super(RWLockFileFormatError, self).__init__( + "RWLock-file '{}' format error.".format(relpath(path)), cause=cause + ) + + +@contextmanager +def _edit_rwlock(lock_dir): + path = os.path.join(lock_dir, "rwlock") + try: + with open(path, "r") as fobj: + lock = json.load(fobj) + lock = SCHEMA(convert_to_unicode(lock)) + except FileNotFoundError: + lock = SCHEMA({}) + except JSONDecodeError as exc: + raise RWLockFileCorruptedError(path, cause=exc) + except Invalid as exc: + raise RWLockFileFormatError(path, cause=exc) + lock = defaultdict(dict, lock) + lock["read"] = defaultdict(list, lock["read"]) + lock["write"] = defaultdict(dict, lock["write"]) + yield lock + with open(path, "w+") as fobj: + json.dump(lock, fobj) + + +def _infos_to_str(infos): + return "\n".join( + " (PID {}): {}".format(info["pid"], info["cmd"]) for info in infos + ) + + +def _check_no_writers(lock, info, path_infos): + for path_info in path_infos: + blocking_urls = lfilter(path_info.overlaps, lock["write"]) + if not blocking_urls: + continue + + writers = lmap(lock["write"].get, blocking_urls) + writers = lfilter(lambda i: info != i, writers) + if not writers: + continue + + raise LockError( + "'{}' is busy, it is being written to by:\n{}".format( + str(path_info), _infos_to_str(writers) + ) + ) + + +def _check_no_readers(lock, info, path_infos): + for path_info in path_infos: + blocking_urls = lfilter(path_info.overlaps, lock["read"]) + if not blocking_urls: + continue + + readers = lmapcat(lock["read"].get, blocking_urls) + readers = lfilter(lambda i: info != i, readers) + if not readers: + continue + + raise LockError( + "'{}' is busy, it is being read by:\n{}".format( + str(path_info), _infos_to_str(readers) + ) + ) + + +def _acquire_read(lock, info, path_infos): + changes = [] + + for path_info in path_infos: + url = path_info.url + + readers = lock["read"][url] + if info in readers: + continue + + changes.append(url) + readers.append(info) + + return changes + + +def _acquire_write(lock, info, path_infos): + changes = [] + + for path_info in path_infos: + url = path_info.url + + if lock["write"][url] == info: + continue + + changes.append(url) + lock["write"][url] = info + + return changes + + +def _release_write(lock, info, changes): + for url in changes: + assert "write" in lock + assert url in lock["write"] + assert lock["write"][url] == info + del lock["write"][url] + if not lock["write"]: + del lock["write"] + + +def _release_read(lock, info, changes): + for url in changes: + assert "read" in lock + assert url in lock["read"] + assert info in lock["read"][url] + lock["read"][url].remove(info) + if not lock["read"][url]: + del lock["read"][url] + if not lock["read"]: + del lock["read"] + + +@contextmanager +def rwlock(tmp_dir, cmd, read, write): + """Create non-thread-safe RWLock for PathInfos. + + Args: + tmp_dir (str): existing directory where to create the rwlock file. + cmd (str): command that will be working on these PathInfos. + read ([PathInfo]): PathInfos that are going to be read. + write ([PathInfo]): PathInfos that are going to be written. + + Raises: + LockError: raised if PathInfo we want to read is being written to by + another command or if PathInfo we want to write is being written + to or read from by another command. + RWLockFileCorruptedError: raised if rwlock file is not a valid JSON. + RWLockFileFormatError: raised if rwlock file is a valid JSON, but + has internal format that doesn't pass our schema validation. + """ + info = {"pid": os.getpid(), "cmd": cmd} + + with _edit_rwlock(tmp_dir) as lock: + _check_no_writers(lock, info, read + write) + _check_no_readers(lock, info, write) + + rchanges = _acquire_read(lock, info, read) + wchanges = _acquire_write(lock, info, write) + + try: + yield + finally: + with _edit_rwlock(tmp_dir) as lock: + _release_write(lock, info, wchanges) + _release_read(lock, info, rchanges) diff --git a/dvc/stage.py b/dvc/stage.py index c3c5daec9c..ce0a9668aa 100644 --- a/dvc/stage.py +++ b/dvc/stage.py @@ -7,7 +7,10 @@ import signal import subprocess import threading + +from functools import wraps from itertools import chain +from funcy import decorator from voluptuous import Any, Schema, MultipleInvalid @@ -129,6 +132,52 @@ def __init__(self, missing_files): super(MissingDataSource, self).__init__(msg) +@decorator +def rwlocked(call, read=None, write=None): + import sys + from dvc.rwlock import rwlock + from dvc.dependency.repo import DependencyREPO + + if read is None: + read = [] + + if write is None: + write = [] + + stage = call._args[0] + + def _chain(names): + return [ + item.path_info + for attr in names + for item in getattr(stage, attr) + # There is no need to lock DependencyREPO deps, as there is no + # corresponding OutputREPO, so we can't even write it. + if not isinstance(item, DependencyREPO) + ] + + cmd = " ".join(sys.argv) + + with rwlock(stage.repo.tmp_dir, cmd, _chain(read), _chain(write)): + return call() + + +def unlocked_repo(f): + @wraps(f) + def wrapper(stage, *args, **kwargs): + stage.repo.state.dump() + stage.repo.lock.unlock() + stage.repo._reset() + try: + ret = f(stage, *args, **kwargs) + finally: + stage.repo.lock.lock() + stage.repo.state.load() + return ret + + return wrapper + + class Stage(object): STAGE_FILE = "Dvcfile" STAGE_FILE_SUFFIX = ".dvc" @@ -286,6 +335,7 @@ def _changed_md5(self): return True return False + @rwlocked(read=["deps", "outs"]) def changed(self): # Short-circuit order: stage md5 is fast, deps are expected to change ret = ( @@ -299,6 +349,7 @@ def changed(self): return ret + @rwlocked(write=["outs"]) def remove_outs(self, ignore_remove=False, force=False): """Used mainly for `dvc remove --outs` and :func:`Stage.reproduce`.""" for out in self.outs: @@ -316,6 +367,7 @@ def unprotect_outs(self): for out in self.outs: out.unprotect() + @rwlocked(write=["outs"]) def remove(self, force=False, remove_outs=True): if remove_outs: self.remove_outs(ignore_remove=True, force=force) @@ -323,6 +375,7 @@ def remove(self, force=False, remove_outs=True): self.unprotect_outs() os.unlink(self.path) + @rwlocked(read=["deps"], write=["outs"]) def reproduce(self, interactive=False, **kwargs): if not kwargs.get("force", False) and not self.changed(): @@ -755,6 +808,7 @@ def check_can_commit(self, force): ) self.save() + @rwlocked(write=["outs"]) def commit(self): for out in self.outs: out.commit() @@ -801,6 +855,7 @@ def _check_duplicated_arguments(self): if occurrence > 1: raise ArgumentDuplicationError(str(path)) + @unlocked_repo def _run(self): self._check_missing_deps() @@ -850,6 +905,7 @@ def _run(self): if (p is None) or (p.returncode != 0): raise StageCmdFailedError(self) + @rwlocked(read=["deps"], write=["outs"]) def run(self, dry=False, no_commit=False, force=False): if (self.cmd or self.is_import) and not self.locked and not dry: self.remove_outs(ignore_remove=False, force=False) @@ -874,7 +930,6 @@ def run(self, dry=False, no_commit=False, force=False): self.outs[0].checkout() else: self.deps[0].download(self.outs[0]) - elif self.is_data_source: msg = "Verifying data sources in '{}'".format(self.relpath) logger.info(msg) @@ -904,6 +959,7 @@ def check_missing_outputs(self): if paths: raise MissingDataSource(paths) + @rwlocked(write=["outs"]) def checkout(self, force=False, progress_callback=None): failed_checkouts = [] for out in self.outs: @@ -923,6 +979,7 @@ def _status(entries): return ret + @rwlocked(read=["deps", "outs"]) def status(self): ret = [] diff --git a/dvc/utils/compat.py b/dvc/utils/compat.py index 372c8d2c60..a4819fccc5 100644 --- a/dvc/utils/compat.py +++ b/dvc/utils/compat.py @@ -123,6 +123,7 @@ def ignore_file_not_found(): makedirs = _makedirs range = xrange # noqa: F821 FileNotFoundError = IOError + JSONDecodeError = ValueError import StringIO import io @@ -164,6 +165,7 @@ def convert_to_unicode(data): import configparser as ConfigParser # noqa: F401 from collections.abc import Mapping # noqa: F401 from contextlib import ExitStack # noqa: F401 + from json.decoder import JSONDecodeError # noqa: F401 builtin_str = str # noqa: F821 str = str # noqa: F821 diff --git a/tests/func/test_run.py b/tests/func/test_run.py index 7d9a51f29e..61db6c98e7 100644 --- a/tests/func/test_run.py +++ b/tests/func/test_run.py @@ -893,7 +893,7 @@ def test(self): ) patch_run = mock.patch.object(stage, "_run", wraps=stage._run) - with self.dvc.state: + with self.dvc.lock, self.dvc.state: with patch_checkout as mock_checkout: with patch_run as mock_run: stage.run() diff --git a/tests/unit/test_rwlock.py b/tests/unit/test_rwlock.py new file mode 100644 index 0000000000..9e554960ef --- /dev/null +++ b/tests/unit/test_rwlock.py @@ -0,0 +1,98 @@ +from __future__ import unicode_literals + +import os + +import pytest + +from dvc.rwlock import ( + rwlock, + _edit_rwlock, + RWLockFileFormatError, + RWLockFileCorruptedError, +) +from dvc.lock import LockError +from dvc.utils.compat import fspath +from dvc.path_info import PathInfo + + +def test_rwlock(tmp_path): + path = fspath(tmp_path) + foo = PathInfo("foo") + + with rwlock(path, "cmd1", [foo], []): + with pytest.raises(LockError): + with rwlock(path, "cmd2", [], [foo]): + pass + + with rwlock(path, "cmd1", [], [foo]): + with pytest.raises(LockError): + with rwlock(path, "cmd2", [foo], []): + pass + + with rwlock(path, "cmd1", [], [foo]): + with pytest.raises(LockError): + with rwlock(path, "cmd2", [], [foo]): + pass + + +def test_rwlock_reentrant(tmp_path): + path = fspath(tmp_path) + foo = PathInfo("foo") + + with rwlock(path, "cmd1", [], [foo]): + with rwlock(path, "cmd1", [], [foo]): + pass + with _edit_rwlock(path) as lock: + assert lock == { + "read": {}, + "write": {"foo": {"cmd": "cmd1", "pid": os.getpid()}}, + } + + with rwlock(path, "cmd", [foo], []): + with rwlock(path, "cmd", [foo], []): + pass + with _edit_rwlock(path) as lock: + assert lock == { + "read": {"foo": [{"cmd": "cmd", "pid": os.getpid()}]}, + "write": {}, + } + + +def test_rwlock_subdirs(tmp_path): + path = fspath(tmp_path) + foo = PathInfo("foo") + subfoo = PathInfo("foo/subfoo") + + with rwlock(path, "cmd1", [foo], []): + with pytest.raises(LockError): + with rwlock(path, "cmd2", [], [subfoo]): + pass + + with rwlock(path, "cmd1", [], [subfoo]): + with pytest.raises(LockError): + with rwlock(path, "cmd2", [foo], []): + pass + + with rwlock(path, "cmd1", [], [subfoo]): + with pytest.raises(LockError): + with rwlock(path, "cmd2", [], [foo]): + pass + + with rwlock(path, "cmd1", [subfoo], []): + with rwlock(path, "cmd2", [foo], []): + pass + + +def test_broken_rwlock(tmp_path): + dir_path = fspath(tmp_path) + path = tmp_path / "rwlock" + + path.write_text('{"broken": "format"}', encoding="utf-8") + with pytest.raises(RWLockFileFormatError): + with _edit_rwlock(dir_path): + pass + + path.write_text("{broken json", encoding="utf-8") + with pytest.raises(RWLockFileCorruptedError): + with _edit_rwlock(dir_path): + pass diff --git a/tests/unit/test_stage.py b/tests/unit/test_stage.py index 652c4b5290..354fb60078 100644 --- a/tests/unit/test_stage.py +++ b/tests/unit/test_stage.py @@ -93,16 +93,14 @@ def test_stage_update(mocker): not isinstance(threading.current_thread(), threading._MainThread), reason="Not running in the main thread.", ) -def test_stage_run_ignore_sigint(mocker): - stage = Stage(None, "path") - +def test_stage_run_ignore_sigint(dvc_repo, mocker): proc = mocker.Mock() communicate = mocker.Mock() proc.configure_mock(returncode=0, communicate=communicate) popen = mocker.patch.object(subprocess, "Popen", return_value=proc) signal_mock = mocker.patch("signal.signal") - stage._run() + dvc_repo.run(cmd="path") assert popen.called_once() assert communicate.called_once_with() @@ -110,8 +108,8 @@ def test_stage_run_ignore_sigint(mocker): assert signal.getsignal(signal.SIGINT) == signal.default_int_handler -def test_always_changed(): - stage = Stage(None, "path", always_changed=True) +def test_always_changed(dvc_repo): + stage = Stage(dvc_repo, "path", always_changed=True) stage.save() assert stage.changed() assert stage.status()["path"] == ["always changed"]