diff --git a/pex/atomic_directory.py b/pex/atomic_directory.py index 5a840f966..8489b6098 100644 --- a/pex/atomic_directory.py +++ b/pex/atomic_directory.py @@ -16,7 +16,11 @@ from pex.typing import TYPE_CHECKING, cast if TYPE_CHECKING: - from typing import Callable, Iterator, Optional + from typing import Callable, Dict, Iterator, Optional + + import attr # vendor:skip +else: + from pex.third_party import attr class AtomicDirectory(object): @@ -140,6 +144,68 @@ def _is_bsd_lock(lock_style=None): return file_lock_style is FileLockStyle.BSD +@attr.s(frozen=True) +class _FileLock(object): + _path = attr.ib() # type: str + _style = attr.ib(default=None) # type: Optional[FileLockStyle.Value] + _in_process_lock = attr.ib(factory=threading.Lock, init=False, eq=False) + + def acquire(self): + # type: () -> Callable[[], None] + self._in_process_lock.acquire() + + # N.B.: We don't actually write anything to the lock file but the fcntl file locking + # operations only work on files opened for at least write. + lock_fd = os.open(self._path, os.O_CREAT | os.O_WRONLY) + + lock_api = cast( + "Callable[[int, int], None]", + fcntl.flock if _is_bsd_lock(self._style) else fcntl.lockf, + ) + + # N.B.: Since lockf and flock operate on an open file descriptor and these are + # guaranteed to be closed by the operating system when the owning process exits, + # this lock is immune to staleness. + lock_api(lock_fd, fcntl.LOCK_EX) # A blocking write lock. + + def release(): + try: + lock_api(lock_fd, fcntl.LOCK_UN) + finally: + os.close(lock_fd) + self._in_process_lock.release() + + return release + + +@attr.s(frozen=True, eq=False) +class _LockManager(object): + _lock = attr.ib(factory=threading.Lock, init=False) # type: threading.Lock + _file_locks = attr.ib(factory=dict, init=False) # type: Dict[str, _FileLock] + + def lock( + self, + file_path, # type: str + lock_style=None, # type: Optional[FileLockStyle.Value] + ): + # type: (...) -> Callable[[], None] + """Locks file path exclusively and returns a callable that can be invoked to unlock it. + + The lock obtained is cross-thread and cross-process and is automatically released when + this process terminates. + """ + with self._lock: + file_lock = self._file_locks.get(file_path) + if file_lock is None: + file_lock = _FileLock(file_path, style=lock_style) + self._file_locks[file_path] = file_lock + + return file_lock.acquire() + + +_LOCK_MANAGER = _LockManager() + + @contextmanager def atomic_directory( target_dir, # type: str @@ -176,26 +242,7 @@ def atomic_directory( safe_mkdir(head) lockfile = os.path.join(head, ".{}.atomic_directory.lck".format(tail or "here")) - # N.B.: We don't actually write anything to the lock file but the fcntl file locking - # operations only work on files opened for at least write. - lock_fd = os.open(lockfile, os.O_CREAT | os.O_WRONLY) - - lock_api = cast( - "Callable[[int, int], None]", - fcntl.flock if _is_bsd_lock(lock_style) else fcntl.lockf, - ) - - def unlock(): - # type: () -> None - try: - lock_api(lock_fd, fcntl.LOCK_UN) - finally: - os.close(lock_fd) - - # N.B.: Since lockf and flock operate on an open file descriptor and these are - # guaranteed to be closed by the operating system when the owning process exits, - # this lock is immune to staleness. - lock_api(lock_fd, fcntl.LOCK_EX) # A blocking write lock. + unlock = _LOCK_MANAGER.lock(file_path=lockfile, lock_style=lock_style) if atomic_dir.is_finalized(): # We lost the double-checked locking race and our work was done for us by the race # winner so exit early.