From f0eea60eab2580ff2b484fa18d516f3cd763d242 Mon Sep 17 00:00:00 2001 From: John Sirois Date: Sun, 26 Feb 2023 20:30:35 -0800 Subject: [PATCH] Wrap inter-process locks in in-process locks. (#2070) This is needed to have independent POSIX fcntl locks in the same process by multiple threads and also needed whenever BSD flock locks silently use fcntl emulation (exotic systems or under NFS mounts). Pex is designed to avoid multi-threading when using POSIX locks; so this just serves as a design-error backstop for those style locks. For silently emulated BSD locks, this provides correctness. Analysis of #2066 and #1969 do not point to this enhancement solving any existing problems, but this is an improvement for the cases mentioned should we hit them. Work regarding #2066. --- pex/atomic_directory.py | 89 +++++++++++++++++++++++++++++++---------- 1 file changed, 68 insertions(+), 21 deletions(-) diff --git a/pex/atomic_directory.py b/pex/atomic_directory.py index 5a840f966..8489b6098 100644 --- a/pex/atomic_directory.py +++ b/pex/atomic_directory.py @@ -16,7 +16,11 @@ from pex.typing import TYPE_CHECKING, cast if TYPE_CHECKING: - from typing import Callable, Iterator, Optional + from typing import Callable, Dict, Iterator, Optional + + import attr # vendor:skip +else: + from pex.third_party import attr class AtomicDirectory(object): @@ -140,6 +144,68 @@ def _is_bsd_lock(lock_style=None): return file_lock_style is FileLockStyle.BSD +@attr.s(frozen=True) +class _FileLock(object): + _path = attr.ib() # type: str + _style = attr.ib(default=None) # type: Optional[FileLockStyle.Value] + _in_process_lock = attr.ib(factory=threading.Lock, init=False, eq=False) + + def acquire(self): + # type: () -> Callable[[], None] + self._in_process_lock.acquire() + + # N.B.: We don't actually write anything to the lock file but the fcntl file locking + # operations only work on files opened for at least write. + lock_fd = os.open(self._path, os.O_CREAT | os.O_WRONLY) + + lock_api = cast( + "Callable[[int, int], None]", + fcntl.flock if _is_bsd_lock(self._style) else fcntl.lockf, + ) + + # N.B.: Since lockf and flock operate on an open file descriptor and these are + # guaranteed to be closed by the operating system when the owning process exits, + # this lock is immune to staleness. + lock_api(lock_fd, fcntl.LOCK_EX) # A blocking write lock. + + def release(): + try: + lock_api(lock_fd, fcntl.LOCK_UN) + finally: + os.close(lock_fd) + self._in_process_lock.release() + + return release + + +@attr.s(frozen=True, eq=False) +class _LockManager(object): + _lock = attr.ib(factory=threading.Lock, init=False) # type: threading.Lock + _file_locks = attr.ib(factory=dict, init=False) # type: Dict[str, _FileLock] + + def lock( + self, + file_path, # type: str + lock_style=None, # type: Optional[FileLockStyle.Value] + ): + # type: (...) -> Callable[[], None] + """Locks file path exclusively and returns a callable that can be invoked to unlock it. + + The lock obtained is cross-thread and cross-process and is automatically released when + this process terminates. + """ + with self._lock: + file_lock = self._file_locks.get(file_path) + if file_lock is None: + file_lock = _FileLock(file_path, style=lock_style) + self._file_locks[file_path] = file_lock + + return file_lock.acquire() + + +_LOCK_MANAGER = _LockManager() + + @contextmanager def atomic_directory( target_dir, # type: str @@ -176,26 +242,7 @@ def atomic_directory( safe_mkdir(head) lockfile = os.path.join(head, ".{}.atomic_directory.lck".format(tail or "here")) - # N.B.: We don't actually write anything to the lock file but the fcntl file locking - # operations only work on files opened for at least write. - lock_fd = os.open(lockfile, os.O_CREAT | os.O_WRONLY) - - lock_api = cast( - "Callable[[int, int], None]", - fcntl.flock if _is_bsd_lock(lock_style) else fcntl.lockf, - ) - - def unlock(): - # type: () -> None - try: - lock_api(lock_fd, fcntl.LOCK_UN) - finally: - os.close(lock_fd) - - # N.B.: Since lockf and flock operate on an open file descriptor and these are - # guaranteed to be closed by the operating system when the owning process exits, - # this lock is immune to staleness. - lock_api(lock_fd, fcntl.LOCK_EX) # A blocking write lock. + unlock = _LOCK_MANAGER.lock(file_path=lockfile, lock_style=lock_style) if atomic_dir.is_finalized(): # We lost the double-checked locking race and our work was done for us by the race # winner so exit early.