Skip to content

Commit

Permalink
pythongh-114271: Make _thread.ThreadHandle thread-safe in free-thre…
Browse files Browse the repository at this point in the history
…aded builds (pythonGH-115190)

Make `_thread.ThreadHandle` thread-safe in free-threaded builds

We protect the mutable state of `ThreadHandle` using a `_PyOnceFlag`.
Concurrent operations (i.e. `join` or `detach`) on `ThreadHandle` block
until it is their turn to execute or an earlier operation succeeds.
Once an operation has been applied successfully all future operations
complete immediately.

The `join()` method is now idempotent. It may be called multiple times
but the underlying OS thread will only be joined once. After `join()`
succeeds, any future calls to `join()` will succeed immediately.

The internal thread handle `detach()` method has been removed.
  • Loading branch information
mpage authored and adorilson committed Mar 25, 2024
1 parent 5eb9239 commit d69e884
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 101 deletions.
13 changes: 13 additions & 0 deletions Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ typedef struct {
uint8_t v;
} PyEvent;

// Check if the event is set without blocking. Returns 1 if the event is set or
// 0 otherwise.
PyAPI_FUNC(int) _PyEvent_IsSet(PyEvent *evt);

// Set the event and notify any waiting threads.
// Export for '_testinternalcapi' shared extension
PyAPI_FUNC(void) _PyEvent_Notify(PyEvent *evt);
Expand All @@ -149,6 +153,15 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt);
// and 0 if the timeout expired or thread was interrupted.
PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns);

// A one-time event notification with reference counting.
typedef struct _PyEventRc {
PyEvent event;
Py_ssize_t refcount;
} _PyEventRc;

_PyEventRc *_PyEventRc_New(void);
void _PyEventRc_Incref(_PyEventRc *erc);
void _PyEventRc_Decref(_PyEventRc *erc);

// _PyRawMutex implements a word-sized mutex that that does not depend on the
// parking lot API, and therefore can be used in the parking lot
Expand Down
91 changes: 47 additions & 44 deletions Lib/test/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ def task():
with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(task)
handle.join()
with self.assertRaisesRegex(ValueError, "not joinable"):
handle.join()
# Subsequent join() calls should succeed
handle.join()

def test_joinable_not_joined(self):
handle_destroyed = thread.allocate_lock()
Expand Down Expand Up @@ -233,58 +233,61 @@ def task():
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
raise errors[0]

def test_detach_from_self(self):
errors = []
handles = []
start_joinable_thread_returned = thread.allocate_lock()
start_joinable_thread_returned.acquire()
thread_detached = thread.allocate_lock()
thread_detached.acquire()
def test_join_then_self_join(self):
# make sure we can't deadlock in the following scenario with
# threads t0 and t1 (see comment in `ThreadHandle_join()` for more
# details):
#
# - t0 joins t1
# - t1 self joins
def make_lock():
lock = thread.allocate_lock()
lock.acquire()
return lock

error = None
self_joiner_handle = None
self_joiner_started = make_lock()
self_joiner_barrier = make_lock()
def self_joiner():
nonlocal error

self_joiner_started.release()
self_joiner_barrier.acquire()

def task():
start_joinable_thread_returned.acquire()
try:
handles[0].detach()
self_joiner_handle.join()
except Exception as e:
errors.append(e)
finally:
thread_detached.release()
error = e

joiner_started = make_lock()
def joiner():
joiner_started.release()
self_joiner_handle.join()

with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(task)
handles.append(handle)
start_joinable_thread_returned.release()
thread_detached.acquire()
with self.assertRaisesRegex(ValueError, "not joinable"):
handle.join()
self_joiner_handle = thread.start_joinable_thread(self_joiner)
# Wait for the self-joining thread to start
self_joiner_started.acquire()

assert len(errors) == 0
# Start the thread that joins the self-joiner
joiner_handle = thread.start_joinable_thread(joiner)

def test_detach_then_join(self):
lock = thread.allocate_lock()
lock.acquire()
# Wait for the joiner to start
joiner_started.acquire()

def task():
lock.acquire()
# Not great, but I don't think there's a deterministic way to make
# sure that the self-joining thread has been joined.
time.sleep(0.1)

with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(task)
# detach() returns even though the thread is blocked on lock
handle.detach()
# join() then cannot be called anymore
with self.assertRaisesRegex(ValueError, "not joinable"):
handle.join()
lock.release()

def test_join_then_detach(self):
def task():
pass
# Unblock the self-joiner
self_joiner_barrier.release()

with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(task)
handle.join()
with self.assertRaisesRegex(ValueError, "not joinable"):
handle.detach()
self_joiner_handle.join()
joiner_handle.join()

with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
raise error


class Barrier:
Expand Down
24 changes: 7 additions & 17 deletions Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,6 @@ class is implemented.
if _HAVE_THREAD_NATIVE_ID:
self._native_id = None
self._tstate_lock = None
self._join_lock = None
self._handle = None
self._started = Event()
self._is_stopped = False
Expand All @@ -956,14 +955,11 @@ def _after_fork(self, new_ident=None):
if self._tstate_lock is not None:
self._tstate_lock._at_fork_reinit()
self._tstate_lock.acquire()
if self._join_lock is not None:
self._join_lock._at_fork_reinit()
else:
# This thread isn't alive after fork: it doesn't have a tstate
# anymore.
self._is_stopped = True
self._tstate_lock = None
self._join_lock = None
self._handle = None

def __repr__(self):
Expand Down Expand Up @@ -996,8 +992,6 @@ def start(self):
if self._started.is_set():
raise RuntimeError("threads can only be started once")

self._join_lock = _allocate_lock()

with _active_limbo_lock:
_limbo[self] = self
try:
Expand Down Expand Up @@ -1167,17 +1161,9 @@ def join(self, timeout=None):
self._join_os_thread()

def _join_os_thread(self):
join_lock = self._join_lock
if join_lock is None:
return
with join_lock:
# Calling join() multiple times would raise an exception
# in one of the callers.
if self._handle is not None:
self._handle.join()
self._handle = None
# No need to keep this around
self._join_lock = None
# self._handle may be cleared post-fork
if self._handle is not None:
self._handle.join()

def _wait_for_tstate_lock(self, block=True, timeout=-1):
# Issue #18808: wait for the thread state to be gone.
Expand Down Expand Up @@ -1478,6 +1464,10 @@ def __init__(self):
with _active_limbo_lock:
_active[self._ident] = self

def _join_os_thread(self):
# No ThreadHandle for main thread
pass


# Helper thread-local instance to detect when a _DummyThread
# is collected. Not a part of the public API.
Expand Down
Loading

0 comments on commit d69e884

Please sign in to comment.