Skip to content

Commit

Permalink
pythongh-123940: Ensure force-terminated daemon threads can be joined
Browse files Browse the repository at this point in the history
During finalization, daemon threads are force to exit immediately (without returning
through the call-stack normally) upon acquiring the GIL. Finalizers that run after
this must be able to join the forcefully terminated threads.

The current implemenation notified of thread completetion before returning
from `thread_run`. This code will never execute if the thread is forced to exit during
finalization. Any code that attempts to join such a thread will block indefinitely.

To fix this, use the old approach of notifying of thread completion when the thread state is
cleared. This happens both when `thread_run` exits normally and when thread states are
destroyed as part of finalization (which happens immediately after forcing daemon threads
to exit, before any python code can run).
  • Loading branch information
mpage committed Sep 17, 2024
1 parent a9c2bc1 commit f6c2da0
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 18 deletions.
3 changes: 3 additions & 0 deletions Include/cpython/pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ struct _ts {
The PyThreadObject must hold the only reference to this value.
*/
PyObject *threading_local_sentinel;

/* Set when the thread is about to exit */
struct _PyEventRc *thread_is_exiting;
};

#ifdef Py_DEBUG
Expand Down
10 changes: 10 additions & 0 deletions Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt);
PyAPI_FUNC(int)
PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns, int detach);

// A one-time event notification with reference counting
typedef struct _PyEventRc {
PyEvent event;
Py_ssize_t refcount;
} _PyEventRc;

extern _PyEventRc *_PyEventRc_New(void);
extern void _PyEventRc_Incref(_PyEventRc *erc);
extern void _PyEventRc_Decref(_PyEventRc *erc);

// _PyRawMutex implements a word-sized mutex that that does not depend on the
// parking lot API, and therefore can be used in the parking lot
// implementation.
Expand Down
3 changes: 3 additions & 0 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ extern void _PyThreadState_Bind(PyThreadState *tstate);
PyAPI_FUNC(PyThreadState *) _PyThreadState_NewBound(
PyInterpreterState *interp,
int whence);
extern PyThreadState *
_PyThreadState_NewWithEvent(PyInterpreterState *interp, int whence,
_PyEventRc *exiting_event);
extern PyThreadState * _PyThreadState_RemoveExcept(PyThreadState *tstate);
extern void _PyThreadState_DeleteList(PyThreadState *list);
extern void _PyThreadState_ClearMimallocHeaps(PyThreadState *tstate);
Expand Down
33 changes: 33 additions & 0 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,39 @@ def __del__(self):
self.assertEqual(out.strip(), b"OK")
self.assertIn(b"can't create new thread at interpreter shutdown", err)

@unittest.skipIf(support.Py_GIL_DISABLED, "daemon threads don't force exit")
def test_join_force_terminated_daemon_thread_in_finalization(self):
# gh-123940: Py_Finalize() forces all daemon threads to exit
# immediately (without unwinding the stack) upon acquiring the
# GIL. Finalizers that run after this must be able to join the daemon
# threads that were forced to exit.
code = textwrap.dedent("""
import threading
def loop():
while True:
pass
class Cycle:
def __init__(self):
self.self_ref = self
self.thr = threading.Thread(target=loop, daemon=True)
self.thr.start()
def __del__(self):
self.thr.join()
# Cycle holds a reference to itself, which ensures it is cleaned
# up during the GC that runs after daemon threads have been
# forced to exit during finalization.
Cycle()
""")
rc, out, err = assert_python_ok("-c", code)
self.assertEqual(err, b"")


class ThreadJoinOnShutdown(BaseTestCase):

def _run_and_join(self, script):
Expand Down
34 changes: 23 additions & 11 deletions Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ typedef struct {
// thread is about to exit. This is used to avoid false positives when
// detecting self-join attempts. See the comment in `ThreadHandle_join()`
// for a more detailed explanation.
PyEvent thread_is_exiting;
_PyEventRc *thread_is_exiting;

// Serializes calls to `join` and `set_done`.
_PyOnceFlag once;
Expand Down Expand Up @@ -174,6 +174,12 @@ remove_from_shutdown_handles(ThreadHandle *handle)
static ThreadHandle *
ThreadHandle_new(void)
{
_PyEventRc *exiting = _PyEventRc_New();
if (exiting == NULL) {
PyErr_NoMemory();
return NULL;
}

ThreadHandle *self =
(ThreadHandle *)PyMem_RawCalloc(1, sizeof(ThreadHandle));
if (self == NULL) {
Expand All @@ -183,7 +189,7 @@ ThreadHandle_new(void)
self->ident = 0;
self->os_handle = 0;
self->has_os_handle = 0;
self->thread_is_exiting = (PyEvent){0};
self->thread_is_exiting = exiting;
self->mutex = (PyMutex){_Py_UNLOCKED};
self->once = (_PyOnceFlag){0};
self->state = THREAD_HANDLE_NOT_STARTED;
Expand Down Expand Up @@ -226,6 +232,8 @@ ThreadHandle_decref(ThreadHandle *self)
return;
}

_PyEventRc_Decref(self->thread_is_exiting);

// Remove ourself from the global list of handles
HEAD_LOCK(&_PyRuntime);
if (self->node.next != NULL) {
Expand Down Expand Up @@ -268,7 +276,7 @@ _PyThread_AfterFork(struct _pythread_runtime_state *state)
handle->state = THREAD_HANDLE_DONE;
handle->once = (_PyOnceFlag){_Py_ONCE_INITIALIZED};
handle->mutex = (PyMutex){_Py_UNLOCKED};
_PyEvent_Notify(&handle->thread_is_exiting);
_PyEvent_Notify(&handle->thread_is_exiting->event);
llist_remove(node);
remove_from_shutdown_handles(handle);
}
Expand Down Expand Up @@ -357,8 +365,6 @@ thread_run(void *boot_raw)
exit:
// Don't need to wait for this thread anymore
remove_from_shutdown_handles(handle);

_PyEvent_Notify(&handle->thread_is_exiting);
ThreadHandle_decref(handle);

// bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with
Expand All @@ -371,7 +377,7 @@ static int
force_done(ThreadHandle *handle)
{
assert(get_thread_handle_state(handle) == THREAD_HANDLE_STARTING);
_PyEvent_Notify(&handle->thread_is_exiting);
_PyEvent_Notify(&handle->thread_is_exiting->event);
set_thread_handle_state(handle, THREAD_HANDLE_DONE);
return 0;
}
Expand Down Expand Up @@ -402,7 +408,8 @@ ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args,
goto start_failed;
}
PyInterpreterState *interp = _PyInterpreterState_GET();
boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING);
boot->tstate = _PyThreadState_NewWithEvent(
interp, _PyThreadState_WHENCE_THREADING, self->thread_is_exiting);
if (boot->tstate == NULL) {
PyMem_RawFree(boot);
if (!PyErr_Occurred()) {
Expand Down Expand Up @@ -492,7 +499,7 @@ ThreadHandle_join(ThreadHandle *self, PyTime_t timeout_ns)
// To work around this, we set `thread_is_exiting` immediately before
// `thread_run` returns. We can be sure that we are not attempting to join
// ourselves if the handle's thread is about to exit.
if (!_PyEvent_IsSet(&self->thread_is_exiting) &&
if (!_PyEvent_IsSet(&self->thread_is_exiting->event) &&
ThreadHandle_ident(self) == PyThread_get_thread_ident_ex()) {
// PyThread_join_thread() would deadlock or error out.
PyErr_SetString(ThreadError, "Cannot join current thread");
Expand All @@ -502,7 +509,8 @@ ThreadHandle_join(ThreadHandle *self, PyTime_t timeout_ns)
// Wait until the deadline for the thread to exit.
PyTime_t deadline = timeout_ns != -1 ? _PyDeadline_Init(timeout_ns) : 0;
int detach = 1;
while (!PyEvent_WaitTimed(&self->thread_is_exiting, timeout_ns, detach)) {
while (!PyEvent_WaitTimed(&self->thread_is_exiting->event, timeout_ns,
detach)) {
if (deadline) {
// _PyDeadline_Get will return a negative value if the deadline has
// been exceeded.
Expand Down Expand Up @@ -537,7 +545,7 @@ set_done(ThreadHandle *handle)
PyErr_SetString(ThreadError, "failed detaching handle");
return -1;
}
_PyEvent_Notify(&handle->thread_is_exiting);
_PyEvent_Notify(&handle->thread_is_exiting->event);
set_thread_handle_state(handle, THREAD_HANDLE_DONE);
return 0;
}
Expand Down Expand Up @@ -649,7 +657,7 @@ static PyObject *
PyThreadHandleObject_is_done(PyThreadHandleObject *self,
PyObject *Py_UNUSED(ignored))
{
if (_PyEvent_IsSet(&self->handle->thread_is_exiting)) {
if (_PyEvent_IsSet(&self->handle->thread_is_exiting->event)) {
Py_RETURN_TRUE;
}
else {
Expand Down Expand Up @@ -2419,6 +2427,10 @@ thread__make_thread_handle(PyObject *module, PyObject *identobj)
if (PyErr_Occurred()) {
return NULL;
}
_PyEventRc *exiting_event = _PyEventRc_New();
if (exiting_event == NULL) {
return NULL;
}
PyThreadHandleObject *hobj =
PyThreadHandleObject_new(state->thread_handle_type);
if (hobj == NULL) {
Expand Down
24 changes: 24 additions & 0 deletions Python/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,30 @@ PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns, int detach)
}
}

_PyEventRc *
_PyEventRc_New(void)
{
_PyEventRc *erc = (_PyEventRc *)PyMem_RawCalloc(1, sizeof(_PyEventRc));
if (erc != NULL) {
erc->refcount = 1;
}
return erc;
}

void
_PyEventRc_Incref(_PyEventRc *erc)
{
_Py_atomic_add_ssize(&erc->refcount, 1);
}

void
_PyEventRc_Decref(_PyEventRc *erc)
{
if (_Py_atomic_add_ssize(&erc->refcount, -1) == 1) {
PyMem_RawFree(erc);
}
}

static int
unlock_once(_PyOnceFlag *o, int res)
{
Expand Down
48 changes: 41 additions & 7 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1458,15 +1458,16 @@ free_threadstate(_PyThreadStateImpl *tstate)
*/

static void
init_threadstate(_PyThreadStateImpl *_tstate,
PyInterpreterState *interp, uint64_t id, int whence)
init_threadstate(_PyThreadStateImpl *_tstate, PyInterpreterState *interp,
uint64_t id, int whence, _PyEventRc *exiting_event)
{
PyThreadState *tstate = (PyThreadState *)_tstate;
if (tstate->_status.initialized) {
Py_FatalError("thread state already initialized");
}

assert(interp != NULL);
tstate->thread_is_exiting = exiting_event;
tstate->interp = interp;
tstate->eval_breaker =
_Py_atomic_load_uintptr_relaxed(&interp->ceval.instrumentation_version);
Expand Down Expand Up @@ -1530,8 +1531,19 @@ add_threadstate(PyInterpreterState *interp, PyThreadState *tstate,
interp->threads.head = tstate;
}

static _PyEventRc *
ensure_event(_PyEventRc *exiting_event)
{
if (exiting_event != NULL) {
_PyEventRc_Incref(exiting_event);
return exiting_event;
}
return _PyEventRc_New();
}

static PyThreadState *
new_threadstate(PyInterpreterState *interp, int whence)
new_threadstate(PyInterpreterState *interp, int whence,
_PyEventRc *exiting_event)
{
_PyThreadStateImpl *tstate;
_PyRuntimeState *runtime = interp->runtime;
Expand All @@ -1544,10 +1556,18 @@ new_threadstate(PyInterpreterState *interp, int whence)
if (new_tstate == NULL) {
return NULL;
}

exiting_event = ensure_event(exiting_event);
if (exiting_event == NULL) {
PyMem_RawFree(new_tstate);
return NULL;
}

#ifdef Py_GIL_DISABLED
Py_ssize_t qsbr_idx = _Py_qsbr_reserve(interp);
if (qsbr_idx < 0) {
PyMem_RawFree(new_tstate);
_PyEventRc_Decref(exiting_event);
return NULL;
}
#endif
Expand Down Expand Up @@ -1578,7 +1598,7 @@ new_threadstate(PyInterpreterState *interp, int whence)
sizeof(*tstate));
}

init_threadstate(tstate, interp, id, whence);
init_threadstate(tstate, interp, id, whence, exiting_event);
add_threadstate(interp, (PyThreadState *)tstate, old_head);

HEAD_UNLOCK(runtime);
Expand Down Expand Up @@ -1606,7 +1626,7 @@ PyThreadState_New(PyInterpreterState *interp)
PyThreadState *
_PyThreadState_NewBound(PyInterpreterState *interp, int whence)
{
PyThreadState *tstate = new_threadstate(interp, whence);
PyThreadState *tstate = new_threadstate(interp, whence, NULL);
if (tstate) {
bind_tstate(tstate);
// This makes sure there's a gilstate tstate bound
Expand All @@ -1622,7 +1642,14 @@ _PyThreadState_NewBound(PyInterpreterState *interp, int whence)
PyThreadState *
_PyThreadState_New(PyInterpreterState *interp, int whence)
{
return new_threadstate(interp, whence);
return new_threadstate(interp, whence, NULL);
}

PyThreadState *
_PyThreadState_NewWithEvent(PyInterpreterState *interp, int whence,
_PyEventRc *exiting_event)
{
return new_threadstate(interp, whence, exiting_event);
}

// We keep this for stable ABI compabibility.
Expand Down Expand Up @@ -1732,6 +1759,13 @@ PyThreadState_Clear(PyThreadState *tstate)

Py_CLEAR(tstate->context);

if (tstate->thread_is_exiting != NULL) {
_PyEventRc *erc = tstate->thread_is_exiting;
tstate->thread_is_exiting = NULL;
_PyEvent_Notify(&erc->event);
_PyEventRc_Decref(erc);
}

#ifdef Py_GIL_DISABLED
// Each thread should clear own freelists in free-threading builds.
struct _Py_freelists *freelists = _Py_freelists_GET();
Expand Down Expand Up @@ -2760,7 +2794,7 @@ PyGILState_Ensure(void)
/* Create a new Python thread state for this thread */
// XXX Use PyInterpreterState_EnsureThreadState()?
tcur = new_threadstate(runtime->gilstate.autoInterpreterState,
_PyThreadState_WHENCE_GILSTATE);
_PyThreadState_WHENCE_GILSTATE, NULL);
if (tcur == NULL) {
Py_FatalError("Couldn't create thread-state for new thread");
}
Expand Down

0 comments on commit f6c2da0

Please sign in to comment.