From 5c7075a67dcdfd234291e89273a92d253dd57671 Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 9 Feb 2023 15:50:34 -0800 Subject: [PATCH] Fix `execute_parallel` "leaking" a thread. Although the thread was not leaked per-se, it could run after `execute_parallel` returned which could cause parallel atomic_directory posix locks to fail. Fixes one case in #1969. --- pex/jobs.py | 92 ++++++++++++++++++---------- tests/integration/test_issue_1336.py | 23 ++++++- 2 files changed, 81 insertions(+), 34 deletions(-) diff --git a/pex/jobs.py b/pex/jobs.py index 734836a2a..2ab38bca1 100644 --- a/pex/jobs.py +++ b/pex/jobs.py @@ -6,6 +6,7 @@ import errno import subprocess from abc import abstractmethod +from contextlib import contextmanager from threading import BoundedSemaphore, Event, Thread from pex.compatibility import Queue, cpu_count @@ -509,44 +510,69 @@ def spawn_jobs(): spawn_queue.put(result) spawn_queue.put(done_sentinel) - spawner = Thread(name="PEX Parallel Job Spawner", target=spawn_jobs) - spawner.daemon = True - spawner.start() + @contextmanager + def spawned_jobs(): + spawner = Thread(name="PEX Parallel Job Spawner", target=spawn_jobs) + spawner.daemon = True + spawner.start() + try: + yield + finally: + stop.set() + # N.B.: We want to ensure, no matter what, the spawn_jobs loop above spins at least once + # so that it can see stop is set and exit in the case it is currently blocked on a put + # waiting for a job slot. + try: + job_slots.release() + except ValueError: + # From the BoundedSemaphore doc: + # + # If the number of releases exceeds the number of acquires, + # raise a ValueError. + # + # In the normal case there will be no job_slots to release; so we expect the + # BoundedSemaphore to raise here. We're guarding against the abnormal case where + # there is a bug in the state machine implied by the execute_parallel code. + pass + spawner.join() - error = None - while True: - spawn_result = spawn_queue.get() + with spawned_jobs(): + error = None + while True: + spawn_result = spawn_queue.get() - if isinstance(spawn_result, DoneSentinel): - if error: - raise error - return + if isinstance(spawn_result, DoneSentinel): + if error: + raise error + return - try: - if isinstance(spawn_result, SpawnError): - try: - se_result = handler.handle_spawn_error(spawn_result.item, spawn_result.error) - if se_result is not None: - yield se_result - except Exception as e: - # Fail fast and proceed to kill all outstanding spawned jobs. - stop.set() - error = e - elif ( - error is not None - ): # I.E.: `item` is not an exception, but there was a prior exception. - spawn_result.spawned_job.kill() - else: - try: - yield spawn_result.spawned_job.await_result() - except Job.Error as e: + try: + if isinstance(spawn_result, SpawnError): try: - je_result = handler.handle_job_error(spawn_result.item, e) - if je_result is not None: - yield je_result + se_result = handler.handle_spawn_error( + spawn_result.item, spawn_result.error + ) + if se_result is not None: + yield se_result except Exception as e: # Fail fast and proceed to kill all outstanding spawned jobs. stop.set() error = e - finally: - job_slots.release() + elif ( + error is not None + ): # I.E.: `item` is not an exception, but there was a prior exception. + spawn_result.spawned_job.kill() + else: + try: + yield spawn_result.spawned_job.await_result() + except Job.Error as e: + try: + je_result = handler.handle_job_error(spawn_result.item, e) + if je_result is not None: + yield je_result + except Exception as e: + # Fail fast and proceed to kill all outstanding spawned jobs. + stop.set() + error = e + finally: + job_slots.release() diff --git a/tests/integration/test_issue_1336.py b/tests/integration/test_issue_1336.py index 08937765a..87a3cc2dc 100644 --- a/tests/integration/test_issue_1336.py +++ b/tests/integration/test_issue_1336.py @@ -4,6 +4,7 @@ import os import subprocess +from pex.compatibility import commonpath from pex.testing import PY310, ensure_python_interpreter, run_pex_command from pex.typing import TYPE_CHECKING @@ -16,4 +17,24 @@ def test_pip_leak(tmpdir): python = ensure_python_interpreter(PY310) pip = os.path.join(os.path.dirname(python), "pip") subprocess.check_call(args=[pip, "install", "setuptools_scm==6.0.1"]) - run_pex_command(args=["--python", python, "bitstring==3.1.7"], python=python).assert_success() + + pex_root = os.path.join(str(tmpdir), "pex_root") + result = run_pex_command( + args=[ + "--pex-root", + pex_root, + "--runtime-pex-root", + pex_root, + "--python", + python, + "bitstring==3.1.7", + "--", + "-c", + "import bitstring, os; print(os.path.realpath(bitstring.__file__))", + ], + python=python, + ) + result.assert_success() + assert os.path.realpath(pex_root) == commonpath( + [os.path.realpath(pex_root), result.output.strip()] + )