Skip to content

Commit

Permalink
rt: fix LocalSet drop in thread local (#5179)
Browse files Browse the repository at this point in the history
`LocalSet` cleans up any tasks that have not yet been completed when it is
dropped. Previously, this cleanup process required access to a thread-local.
Suppose a `LocalSet` is stored in a thread-local itself. In that case, when it is
dropped, there is no guarantee the drop implementation will be able to
access the internal `LocalSet` thread-local as it may already have been
destroyed.

The internal `LocalSet` thread local is mainly used to avoid writing unsafe
code. All `LocalState` that cannot be moved across threads is stored in the
thread-local and accessed on demand.

This patch moves this local-only state into the `LocalSet`'s "shared" struct.
Because this struct *is* `Send`, the local-only state is stored in `UnsafeCell`,
and callers must ensure not to touch it from other threads.

A debug assertion is added to enforce this requirement in tests.

Fixes #5162
  • Loading branch information
carllerche authored Nov 10, 2022
1 parent 9e3fb16 commit b7812c8
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 125 deletions.
6 changes: 3 additions & 3 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::loom::thread::AccessError;
use crate::runtime::coop;

use std::cell::Cell;
Expand Down Expand Up @@ -63,12 +64,11 @@ pub(crate) fn thread_rng_n(n: u32) -> u32 {
CONTEXT.with(|ctx| ctx.rng.fastrand_n(n))
}

pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> R {
CONTEXT.with(|ctx| f(&ctx.budget))
pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError> {
CONTEXT.try_with(|ctx| f(&ctx.budget))
}

cfg_rt! {
use crate::loom::thread::AccessError;
use crate::runtime::TryCurrentError;

use std::fmt;
Expand Down
38 changes: 21 additions & 17 deletions tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

use crate::runtime::context;

use std::cell::Cell;

/// Opaque type tracking the amount of "work" a task may still do before
/// yielding back to the scheduler.
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -79,37 +77,42 @@ pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {

#[inline(always)]
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
struct ResetGuard<'a> {
cell: &'a Cell<Budget>,
struct ResetGuard {
prev: Budget,
}

impl<'a> Drop for ResetGuard<'a> {
impl Drop for ResetGuard {
fn drop(&mut self) {
self.cell.set(self.prev);
let _ = context::budget(|cell| {
cell.set(self.prev);
});
}
}

context::budget(|cell| {
#[allow(unused_variables)]
let maybe_guard = context::budget(|cell| {
let prev = cell.get();

cell.set(budget);

let _guard = ResetGuard { cell, prev };
ResetGuard { prev }
});

f()
})
// The function is called regardless even if the budget is not successfully
// set due to the thread-local being destroyed.
f()
}

#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
context::budget(|cell| cell.get().has_remaining())
// If the current budget cannot be accessed due to the thread-local being
// shutdown, then we assume there is budget remaining.
context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
}

cfg_rt_multi_thread! {
/// Sets the current task's budget.
pub(crate) fn set(budget: Budget) {
context::budget(|cell| cell.set(budget))
let _ = context::budget(|cell| cell.set(budget));
}
}

Expand All @@ -122,11 +125,12 @@ cfg_rt! {
let prev = cell.get();
cell.set(Budget::unconstrained());
prev
})
}).unwrap_or(Budget::unconstrained())
}
}

cfg_coop! {
use std::cell::Cell;
use std::task::{Context, Poll};

#[must_use]
Expand All @@ -144,7 +148,7 @@ cfg_coop! {
// They are both represented as the remembered budget being unconstrained.
let budget = self.0.get();
if !budget.is_unconstrained() {
context::budget(|cell| {
let _ = context::budget(|cell| {
cell.set(budget);
});
}
Expand Down Expand Up @@ -176,7 +180,7 @@ cfg_coop! {
cx.waker().wake_by_ref();
Poll::Pending
}
})
}).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
}

impl Budget {
Expand Down Expand Up @@ -209,7 +213,7 @@ mod test {
use wasm_bindgen_test::wasm_bindgen_test as test;

fn get() -> Budget {
context::budget(|cell| cell.get())
context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained())
}

#[test]
Expand Down
Loading

0 comments on commit b7812c8

Please sign in to comment.