Skip to content

Commit

Permalink
Rollup merge of #95801 - m-ou-se:futex-rwlock, r=Amanieu
Browse files Browse the repository at this point in the history
Replace RwLock by a futex based one on Linux

This replaces the pthread-based RwLock on Linux by a futex based one.

This implementation is similar to [the algorithm](https://gist.github.com/kprotty/3042436aa55620d8ebcddf2bf25668bc) suggested by `@kprotty,` but modified to prefer writers and spin before sleeping. It uses two futexes: One for the readers to wait on, and one for the writers to wait on. The readers futex contains the state of the RwLock: The number of readers, a bit indicating whether writers are waiting, and a bit indicating whether readers are waiting. The writers futex is used as a simple condition variable and its contents are meaningless; it just needs to be changed on every notification.

Using two futexes rather than one has the obvious advantage of allowing a separate queue for readers and writers, but it also means we avoid the problem a single-futex RwLock would have of making it hard for a writer to go to sleep while the number of readers is rapidly changing up and down, as the writers futex is only changed when we actually want to wake up a writer.

It always prefers writers, as we decided [here](#93740 (comment)).

To be able to prefer writers, it relies on futex_wake to return the number of awoken threads to be able to handle write-unlocking while both the readers-waiting and writers-waiting bits are set. Instead of waking both and letting them race, it first wakes writers and only continues to wake the readers too if futex_wake reported there were no writers to wake up.

r? `@Amanieu`
  • Loading branch information
Dylan-DPC authored Apr 11, 2022
2 parents 2ad701e + 8339381 commit a15ac30
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 8 deletions.
20 changes: 14 additions & 6 deletions library/std/src/sys/unix/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
use crate::sync::atomic::AtomicI32;
use crate::time::Duration;

/// Wait for a futex_wake operation to wake us.
///
/// Returns directly if the futex doesn't hold the expected value.
///
/// Returns false on timeout, and true in all other cases.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) -> bool {
use super::time::Timespec;
Expand Down Expand Up @@ -68,18 +73,23 @@ pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) {
}
}

/// Wake up one thread that's blocked on futex_wait on this futex.
///
/// Returns true if this actually woke up such a thread,
/// or false if no thread was waiting on this futex.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wake(futex: &AtomicI32) {
pub fn futex_wake(futex: &AtomicI32) -> bool {
unsafe {
libc::syscall(
libc::SYS_futex,
futex as *const AtomicI32,
libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
1,
);
) > 0
}
}

/// Wake up all threads that are waiting on futex_wait on this futex.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wake_all(futex: &AtomicI32) {
unsafe {
Expand All @@ -93,12 +103,10 @@ pub fn futex_wake_all(futex: &AtomicI32) {
}

#[cfg(target_os = "emscripten")]
pub fn futex_wake(futex: &AtomicI32) {
pub fn futex_wake(futex: &AtomicI32) -> bool {
extern "C" {
fn emscripten_futex_wake(addr: *const AtomicI32, count: libc::c_int) -> libc::c_int;
}

unsafe {
emscripten_futex_wake(futex as *const AtomicI32, 1);
}
unsafe { emscripten_futex_wake(futex as *const AtomicI32, 1) > 0 }
}
313 changes: 313 additions & 0 deletions library/std/src/sys/unix/locks/futex_rwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
use crate::sync::atomic::{
AtomicI32,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};

pub type MovableRwLock = RwLock;

pub struct RwLock {
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
// Bits 0..30:
// 0: Unlocked
// 1..=0x3FFF_FFFE: Locked by N readers
// 0x3FFF_FFFF: Write locked
// Bit 30: Readers are waiting on this futex.
// Bit 31: Writers are waiting on the writer_notify futex.
state: AtomicI32,
// The 'condition variable' to notify writers through.
// Incremented on every signal.
writer_notify: AtomicI32,
}

const READ_LOCKED: i32 = 1;
const MASK: i32 = (1 << 30) - 1;
const WRITE_LOCKED: i32 = MASK;
const MAX_READERS: i32 = MASK - 1;
const READERS_WAITING: i32 = 1 << 30;
const WRITERS_WAITING: i32 = 1 << 31;

fn is_unlocked(state: i32) -> bool {
state & MASK == 0
}

fn is_write_locked(state: i32) -> bool {
state & MASK == WRITE_LOCKED
}

fn has_readers_waiting(state: i32) -> bool {
state & READERS_WAITING != 0
}

fn has_writers_waiting(state: i32) -> bool {
state & WRITERS_WAITING != 0
}

fn is_read_lockable(state: i32) -> bool {
// This also returns false if the counter could overflow if we tried to read lock it.
//
// We don't allow read-locking if there's readers waiting, even if the lock is unlocked
// and there's no writers waiting. The only situation when this happens is after unlocking,
// at which point the unlocking thread might be waking up writers, which have priority over readers.
// The unlocking thread will clear the readers waiting bit and wake up readers, if necssary.
state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
}

fn has_reached_max_readers(state: i32) -> bool {
state & MASK == MAX_READERS
}

impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicI32::new(0), writer_notify: AtomicI32::new(0) }
}

#[inline]
pub unsafe fn destroy(&self) {}

#[inline]
pub unsafe fn try_read(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
.is_ok()
}

#[inline]
pub unsafe fn read(&self) {
let state = self.state.load(Relaxed);
if !is_read_lockable(state)
|| self
.state
.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
.is_err()
{
self.read_contended();
}
}

#[inline]
pub unsafe fn read_unlock(&self) {
let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;

// It's impossible for a reader to be waiting on a read-locked RwLock,
// except if there is also a writer waiting.
debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));

// Wake up a writer if we were the last reader and there's a writer waiting.
if is_unlocked(state) && has_writers_waiting(state) {
self.wake_writer_or_readers(state);
}
}

#[cold]
fn read_contended(&self) {
let mut state = self.spin_read();

loop {
// If we can lock it, lock it.
if is_read_lockable(state) {
match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
{
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}

// Check for overflow.
if has_reached_max_readers(state) {
panic!("too many active read locks on RwLock");
}

// Make sure the readers waiting bit is set before we go to sleep.
if !has_readers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}

// Wait for the state to change.
futex_wait(&self.state, state | READERS_WAITING, None);

// Spin again after waking up.
state = self.spin_read();
}
}

#[inline]
pub unsafe fn try_write(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
.is_ok()
}

#[inline]
pub unsafe fn write(&self) {
if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
self.write_contended();
}
}

#[inline]
pub unsafe fn write_unlock(&self) {
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;

debug_assert!(is_unlocked(state));

if has_writers_waiting(state) || has_readers_waiting(state) {
self.wake_writer_or_readers(state);
}
}

#[cold]
fn write_contended(&self) {
let mut state = self.spin_write();

let mut other_writers_waiting = 0;

loop {
// If it's unlocked, we try to lock it.
if is_unlocked(state) {
match self.state.compare_exchange_weak(
state,
state | WRITE_LOCKED | other_writers_waiting,
Acquire,
Relaxed,
) {
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}

// Set the waiting bit indicating that we're waiting on it.
if !has_writers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}

// Other writers might be waiting now too, so we should make sure
// we keep that bit on once we manage lock it.
other_writers_waiting = WRITERS_WAITING;

// Examine the notification counter before we check if `state` has changed,
// to make sure we don't miss any notifications.
let seq = self.writer_notify.load(Acquire);

// Don't go to sleep if the lock has become available,
// or if the writers waiting bit is no longer set.
let s = self.state.load(Relaxed);
if is_unlocked(state) || !has_writers_waiting(s) {
state = s;
continue;
}

// Wait for the state to change.
futex_wait(&self.writer_notify, seq, None);

// Spin again after waking up.
state = self.spin_write();
}
}

/// Wake up waiting threads after unlocking.
///
/// If both are waiting, this will wake up only one writer, but will fall
/// back to waking up readers if there was no writer to wake up.
#[cold]
fn wake_writer_or_readers(&self, mut state: i32) {
assert!(is_unlocked(state));

// The readers waiting bit might be turned on at any point now,
// since readers will block when there's anything waiting.
// Writers will just lock the lock though, regardless of the waiting bits,
// so we don't have to worry about the writer waiting bit.
//
// If the lock gets locked in the meantime, we don't have to do
// anything, because then the thread that locked the lock will take
// care of waking up waiters when it unlocks.

// If only writers are waiting, wake one of them up.
if state == WRITERS_WAITING {
match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
Ok(_) => {
self.wake_writer();
return;
}
Err(s) => {
// Maybe some readers are now waiting too. So, continue to the next `if`.
state = s;
}
}
}

// If both writers and readers are waiting, leave the readers waiting
// and only wake up one writer.
if state == READERS_WAITING + WRITERS_WAITING {
if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
// The lock got locked. Not our problem anymore.
return;
}
if self.wake_writer() {
return;
}
// No writers were actually blocked on futex_wait, so we continue
// to wake up readers instead, since we can't be sure if we notified a writer.
state = READERS_WAITING;
}

// If readers are waiting, wake them all up.
if state == READERS_WAITING {
if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
futex_wake_all(&self.state);
}
}
}

/// This wakes one writer and returns true if we woke up a writer that was
/// blocked on futex_wait.
///
/// If this returns false, it might still be the case that we notified a
/// writer that was about to go to sleep.
fn wake_writer(&self) -> bool {
self.writer_notify.fetch_add(1, Release);
futex_wake(&self.writer_notify)
}

/// Spin for a while, but stop directly at the given condition.
fn spin_until(&self, f: impl Fn(i32) -> bool) -> i32 {
let mut spin = 100; // Chosen by fair dice roll.
loop {
let state = self.state.load(Relaxed);
if f(state) || spin == 0 {
return state;
}
crate::hint::spin_loop();
spin -= 1;
}
}

fn spin_write(&self) -> i32 {
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}

fn spin_read(&self) -> i32 {
// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
})
}
}
4 changes: 2 additions & 2 deletions library/std/src/sys/unix/locks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ cfg_if::cfg_if! {
target_os = "android",
))] {
mod futex;
mod futex_rwlock;
#[allow(dead_code)]
mod pthread_mutex; // Only used for PthreadMutexAttr, needed by pthread_remutex.
mod pthread_remutex; // FIXME: Implement this using a futex
mod pthread_rwlock; // FIXME: Implement this using a futex
pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar};
pub use pthread_remutex::ReentrantMutex;
pub use pthread_rwlock::{RwLock, MovableRwLock};
pub use futex_rwlock::{RwLock, MovableRwLock};
} else {
mod pthread_mutex;
mod pthread_remutex;
Expand Down

0 comments on commit a15ac30

Please sign in to comment.