Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-114271: Make _thread.ThreadHandle thread-safe in free-threaded builds #115190

Merged
merged 21 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ typedef struct {
uint8_t v;
} PyEvent;

// Check if the event is set without blocking. Returns 1 if the event is set or
// 0 otherwise.
PyAPI_FUNC(int) _PyEvent_IsSet(PyEvent *evt);

// Set the event and notify any waiting threads.
// Export for '_testinternalcapi' shared extension
PyAPI_FUNC(void) _PyEvent_Notify(PyEvent *evt);
Expand All @@ -148,6 +152,15 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt);
// and 0 if the timeout expired or thread was interrupted.
PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, _PyTime_t timeout_ns);

// A one-time event notification with reference counting.
typedef struct _PyEventRc {
PyEvent event;
Py_ssize_t refcount;
} _PyEventRc;

_PyEventRc *_PyEventRc_New(void);
void _PyEventRc_Incref(_PyEventRc *erc);
void _PyEventRc_Decref(_PyEventRc *erc);

// _PyRawMutex implements a word-sized mutex that that does not depend on the
// parking lot API, and therefore can be used in the parking lot
Expand Down
90 changes: 46 additions & 44 deletions Lib/test/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ def task():
with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(task)
handle.join()
with self.assertRaisesRegex(ValueError, "not joinable"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting... our docs already claim that threads can be joined multiple times. So I wonder why this existing test logic was previously explicitly checking for an error here.

In this sense this change aligns with what our docs claim so a behavior change here could be seen as a bugfix. I do not expect anyone to be depending on subsequent join()s of a thread raising regardless.

handle.join()
# Subsequent join() calls should succeed
handle.join()

def test_joinable_not_joined(self):
handle_destroyed = thread.allocate_lock()
Expand Down Expand Up @@ -233,58 +233,60 @@ def task():
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
raise errors[0]

def test_detach_from_self(self):
errors = []
handles = []
start_joinable_thread_returned = thread.allocate_lock()
start_joinable_thread_returned.acquire()
thread_detached = thread.allocate_lock()
thread_detached.acquire()
def test_join_then_self_join(self):
# make sure we can't deadlock in the following scenario with
mpage marked this conversation as resolved.
Show resolved Hide resolved
# threads t0 and t1:
#
# - t0 joins t1
# - t1 self joins
def make_lock():
lock = thread.allocate_lock()
lock.acquire()
return lock

error = None
self_joiner_handle = None
self_joiner_started = make_lock()
self_joiner_barrier = make_lock()
def self_joiner():
nonlocal error

self_joiner_started.release()
self_joiner_barrier.acquire()

def task():
start_joinable_thread_returned.acquire()
try:
handles[0].detach()
self_joiner_handle.join()
except Exception as e:
errors.append(e)
finally:
thread_detached.release()
error = e

joiner_started = make_lock()
def joiner():
joiner_started.release()
self_joiner_handle.join()

with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(task)
handles.append(handle)
start_joinable_thread_returned.release()
thread_detached.acquire()
with self.assertRaisesRegex(ValueError, "not joinable"):
handle.join()
self_joiner_handle = thread.start_joinable_thread(self_joiner)
# Wait for the self-joining thread to start
self_joiner_started.acquire()

assert len(errors) == 0
# Start the thread that joins the self-joiner
joiner_handle = thread.start_joinable_thread(joiner)

def test_detach_then_join(self):
lock = thread.allocate_lock()
lock.acquire()
# Wait for the joiner to start
joiner_started.acquire()

def task():
lock.acquire()
# Not great, but I don't think there's a deterministic way to make
gpshead marked this conversation as resolved.
Show resolved Hide resolved
# sure that the self-joining thread has been joined.
time.sleep(0.1)

with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(task)
# detach() returns even though the thread is blocked on lock
handle.detach()
# join() then cannot be called anymore
with self.assertRaisesRegex(ValueError, "not joinable"):
handle.join()
lock.release()

def test_join_then_detach(self):
def task():
pass
# Unblock the self-joiner
self_joiner_barrier.release()

with threading_helper.wait_threads_exit():
handle = thread.start_joinable_thread(task)
handle.join()
with self.assertRaisesRegex(ValueError, "not joinable"):
handle.detach()
self_joiner_handle.join()
joiner_handle.join()

with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
raise error


class Barrier:
Expand Down
23 changes: 7 additions & 16 deletions Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,14 +956,11 @@ def _after_fork(self, new_ident=None):
if self._tstate_lock is not None:
self._tstate_lock._at_fork_reinit()
self._tstate_lock.acquire()
if self._join_lock is not None:
self._join_lock._at_fork_reinit()
else:
# This thread isn't alive after fork: it doesn't have a tstate
# anymore.
self._is_stopped = True
self._tstate_lock = None
self._join_lock = None
self._handle = None

def __repr__(self):
Expand Down Expand Up @@ -996,8 +993,6 @@ def start(self):
if self._started.is_set():
raise RuntimeError("threads can only be started once")

self._join_lock = _allocate_lock()
mpage marked this conversation as resolved.
Show resolved Hide resolved

with _active_limbo_lock:
_limbo[self] = self
try:
Expand Down Expand Up @@ -1167,17 +1162,9 @@ def join(self, timeout=None):
self._join_os_thread()

def _join_os_thread(self):
join_lock = self._join_lock
if join_lock is None:
return
with join_lock:
# Calling join() multiple times would raise an exception
# in one of the callers.
if self._handle is not None:
self._handle.join()
self._handle = None
# No need to keep this around
self._join_lock = None
# self._handle may be cleared post-fork
if self._handle is not None:
self._handle.join()

def _wait_for_tstate_lock(self, block=True, timeout=-1):
# Issue #18808: wait for the thread state to be gone.
Expand Down Expand Up @@ -1478,6 +1465,10 @@ def __init__(self):
with _active_limbo_lock:
_active[self._ident] = self

def _join_os_thread(self):
# No ThreadHandle for main thread
pass


# Helper thread-local instance to detect when a _DummyThread
# is collected. Not a part of the public API.
Expand Down
Loading
Loading