Skip to content

Commit

Permalink
Wrap inter-process locks in in-process locks. (#2070)
Browse files Browse the repository at this point in the history
This is needed to have independent POSIX fcntl locks in the same process
by multiple threads and also needed whenever BSD flock locks silently
use fcntl emulation (exotic systems or under NFS mounts). Pex is
designed to avoid multi-threading when using POSIX locks; so this just
serves as a design-error backstop for those style locks. For silently
emulated BSD locks, this provides correctness.

Analysis of #2066 and #1969 do not point to this enhancement solving any
existing problems, but this is an improvement for the cases mentioned
should we hit them.

Work regarding #2066.
  • Loading branch information
jsirois authored Feb 27, 2023
1 parent 9c972c2 commit f0eea60
Showing 1 changed file with 68 additions and 21 deletions.
89 changes: 68 additions & 21 deletions pex/atomic_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
from pex.typing import TYPE_CHECKING, cast

if TYPE_CHECKING:
from typing import Callable, Iterator, Optional
from typing import Callable, Dict, Iterator, Optional

import attr # vendor:skip
else:
from pex.third_party import attr


class AtomicDirectory(object):
Expand Down Expand Up @@ -140,6 +144,68 @@ def _is_bsd_lock(lock_style=None):
return file_lock_style is FileLockStyle.BSD


@attr.s(frozen=True)
class _FileLock(object):
_path = attr.ib() # type: str
_style = attr.ib(default=None) # type: Optional[FileLockStyle.Value]
_in_process_lock = attr.ib(factory=threading.Lock, init=False, eq=False)

def acquire(self):
# type: () -> Callable[[], None]
self._in_process_lock.acquire()

# N.B.: We don't actually write anything to the lock file but the fcntl file locking
# operations only work on files opened for at least write.
lock_fd = os.open(self._path, os.O_CREAT | os.O_WRONLY)

lock_api = cast(
"Callable[[int, int], None]",
fcntl.flock if _is_bsd_lock(self._style) else fcntl.lockf,
)

# N.B.: Since lockf and flock operate on an open file descriptor and these are
# guaranteed to be closed by the operating system when the owning process exits,
# this lock is immune to staleness.
lock_api(lock_fd, fcntl.LOCK_EX) # A blocking write lock.

def release():
try:
lock_api(lock_fd, fcntl.LOCK_UN)
finally:
os.close(lock_fd)
self._in_process_lock.release()

return release


@attr.s(frozen=True, eq=False)
class _LockManager(object):
_lock = attr.ib(factory=threading.Lock, init=False) # type: threading.Lock
_file_locks = attr.ib(factory=dict, init=False) # type: Dict[str, _FileLock]

def lock(
self,
file_path, # type: str
lock_style=None, # type: Optional[FileLockStyle.Value]
):
# type: (...) -> Callable[[], None]
"""Locks file path exclusively and returns a callable that can be invoked to unlock it.
The lock obtained is cross-thread and cross-process and is automatically released when
this process terminates.
"""
with self._lock:
file_lock = self._file_locks.get(file_path)
if file_lock is None:
file_lock = _FileLock(file_path, style=lock_style)
self._file_locks[file_path] = file_lock

return file_lock.acquire()


_LOCK_MANAGER = _LockManager()


@contextmanager
def atomic_directory(
target_dir, # type: str
Expand Down Expand Up @@ -176,26 +242,7 @@ def atomic_directory(
safe_mkdir(head)
lockfile = os.path.join(head, ".{}.atomic_directory.lck".format(tail or "here"))

# N.B.: We don't actually write anything to the lock file but the fcntl file locking
# operations only work on files opened for at least write.
lock_fd = os.open(lockfile, os.O_CREAT | os.O_WRONLY)

lock_api = cast(
"Callable[[int, int], None]",
fcntl.flock if _is_bsd_lock(lock_style) else fcntl.lockf,
)

def unlock():
# type: () -> None
try:
lock_api(lock_fd, fcntl.LOCK_UN)
finally:
os.close(lock_fd)

# N.B.: Since lockf and flock operate on an open file descriptor and these are
# guaranteed to be closed by the operating system when the owning process exits,
# this lock is immune to staleness.
lock_api(lock_fd, fcntl.LOCK_EX) # A blocking write lock.
unlock = _LOCK_MANAGER.lock(file_path=lockfile, lock_style=lock_style)
if atomic_dir.is_finalized():
# We lost the double-checked locking race and our work was done for us by the race
# winner so exit early.
Expand Down

0 comments on commit f0eea60

Please sign in to comment.