Skip to content

Commit

Permalink
Fix execute_parallel "leaking" a thread.
Browse files Browse the repository at this point in the history
Although the thread was not leaked per-se, it could run after
`execute_parallel` returned which could cause parallel atomic_directory
posix locks to fail.

Fixes one case in pex-tool#1969.
  • Loading branch information
jsirois committed Feb 10, 2023
1 parent 97a2497 commit 5c7075a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 34 deletions.
92 changes: 59 additions & 33 deletions pex/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import errno
import subprocess
from abc import abstractmethod
from contextlib import contextmanager
from threading import BoundedSemaphore, Event, Thread

from pex.compatibility import Queue, cpu_count
Expand Down Expand Up @@ -509,44 +510,69 @@ def spawn_jobs():
spawn_queue.put(result)
spawn_queue.put(done_sentinel)

spawner = Thread(name="PEX Parallel Job Spawner", target=spawn_jobs)
spawner.daemon = True
spawner.start()
@contextmanager
def spawned_jobs():
spawner = Thread(name="PEX Parallel Job Spawner", target=spawn_jobs)
spawner.daemon = True
spawner.start()
try:
yield
finally:
stop.set()
# N.B.: We want to ensure, no matter what, the spawn_jobs loop above spins at least once
# so that it can see stop is set and exit in the case it is currently blocked on a put
# waiting for a job slot.
try:
job_slots.release()
except ValueError:
# From the BoundedSemaphore doc:
#
# If the number of releases exceeds the number of acquires,
# raise a ValueError.
#
# In the normal case there will be no job_slots to release; so we expect the
# BoundedSemaphore to raise here. We're guarding against the abnormal case where
# there is a bug in the state machine implied by the execute_parallel code.
pass
spawner.join()

error = None
while True:
spawn_result = spawn_queue.get()
with spawned_jobs():
error = None
while True:
spawn_result = spawn_queue.get()

if isinstance(spawn_result, DoneSentinel):
if error:
raise error
return
if isinstance(spawn_result, DoneSentinel):
if error:
raise error
return

try:
if isinstance(spawn_result, SpawnError):
try:
se_result = handler.handle_spawn_error(spawn_result.item, spawn_result.error)
if se_result is not None:
yield se_result
except Exception as e:
# Fail fast and proceed to kill all outstanding spawned jobs.
stop.set()
error = e
elif (
error is not None
): # I.E.: `item` is not an exception, but there was a prior exception.
spawn_result.spawned_job.kill()
else:
try:
yield spawn_result.spawned_job.await_result()
except Job.Error as e:
try:
if isinstance(spawn_result, SpawnError):
try:
je_result = handler.handle_job_error(spawn_result.item, e)
if je_result is not None:
yield je_result
se_result = handler.handle_spawn_error(
spawn_result.item, spawn_result.error
)
if se_result is not None:
yield se_result
except Exception as e:
# Fail fast and proceed to kill all outstanding spawned jobs.
stop.set()
error = e
finally:
job_slots.release()
elif (
error is not None
): # I.E.: `item` is not an exception, but there was a prior exception.
spawn_result.spawned_job.kill()
else:
try:
yield spawn_result.spawned_job.await_result()
except Job.Error as e:
try:
je_result = handler.handle_job_error(spawn_result.item, e)
if je_result is not None:
yield je_result
except Exception as e:
# Fail fast and proceed to kill all outstanding spawned jobs.
stop.set()
error = e
finally:
job_slots.release()
23 changes: 22 additions & 1 deletion tests/integration/test_issue_1336.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import subprocess

from pex.compatibility import commonpath
from pex.testing import PY310, ensure_python_interpreter, run_pex_command
from pex.typing import TYPE_CHECKING

Expand All @@ -16,4 +17,24 @@ def test_pip_leak(tmpdir):
python = ensure_python_interpreter(PY310)
pip = os.path.join(os.path.dirname(python), "pip")
subprocess.check_call(args=[pip, "install", "setuptools_scm==6.0.1"])
run_pex_command(args=["--python", python, "bitstring==3.1.7"], python=python).assert_success()

pex_root = os.path.join(str(tmpdir), "pex_root")
result = run_pex_command(
args=[
"--pex-root",
pex_root,
"--runtime-pex-root",
pex_root,
"--python",
python,
"bitstring==3.1.7",
"--",
"-c",
"import bitstring, os; print(os.path.realpath(bitstring.__file__))",
],
python=python,
)
result.assert_success()
assert os.path.realpath(pex_root) == commonpath(
[os.path.realpath(pex_root), result.output.strip()]
)

0 comments on commit 5c7075a

Please sign in to comment.