Skip to content

Commit

Permalink
Merge #115 #116
Browse files Browse the repository at this point in the history
115: Misc improvements r=Amanieu a=Amanieu



116:  Make functions from parking_lot_core #[inline] r=Amanieu a=Amanieu

These are only ever called from #[inline(never)] functions, so there shouldn't be too much code duplication from monomorphization.

Co-authored-by: Amanieu d'Antras <[email protected]>
  • Loading branch information
bors[bot] and Amanieu committed Feb 2, 2019
3 parents 02b1ed0 + 318d6f3 + 9df9119 commit 950fee9
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 94 deletions.
119 changes: 42 additions & 77 deletions core/src/parking_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct HashTable {
}

impl HashTable {
#[inline]
fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
Expand Down Expand Up @@ -69,6 +70,7 @@ struct Bucket {
}

impl Bucket {
#[inline]
pub fn new() -> Self {
Self {
mutex: WordLock::INIT,
Expand All @@ -83,6 +85,7 @@ impl Bucket {

// Implementation of Clone for Bucket, needed to make vec![] work
impl Clone for Bucket {
#[inline]
fn clone(&self) -> Self {
Self::new()
}
Expand All @@ -97,6 +100,7 @@ struct FairTimeout {
}

impl FairTimeout {
#[inline]
fn new() -> FairTimeout {
FairTimeout {
timeout: Instant::now(),
Expand All @@ -105,6 +109,7 @@ impl FairTimeout {
}

// Determine whether we should force a fair unlock, and update the timeout
#[inline]
fn should_timeout(&mut self) -> bool {
let now = Instant::now();
if now > self.timeout {
Expand Down Expand Up @@ -163,6 +168,7 @@ impl ThreadData {
}

// Invokes the given closure with a reference to the current thread `ThreadData`.
#[inline(always)]
fn with_thread_data<F, T>(f: F) -> T
where
F: FnOnce(&ThreadData) -> T,
Expand Down Expand Up @@ -196,32 +202,41 @@ impl Drop for ThreadData {
}

// Get a pointer to the latest hash table, creating one if it doesn't exist yet.
#[inline]
fn get_hashtable() -> *mut HashTable {
let mut table = HASHTABLE.load(Ordering::Acquire);
let table = HASHTABLE.load(Ordering::Acquire);

// If there is no table, create one
if table.is_null() {
let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));

// If this fails then it means some other thread created the hash
// table first.
match HASHTABLE.compare_exchange(
ptr::null_mut(),
new_table,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => return new_table,
Err(x) => table = x,
}
create_hashtable()
} else {
table
}
}

// Free the table we created
unsafe {
Box::from_raw(new_table);
// Get a pointer to the latest hash table, creating one if it doesn't exist yet.
#[cold]
#[inline(never)]
fn create_hashtable() -> *mut HashTable {
let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));

// If this fails then it means some other thread created the hash
// table first.
match HASHTABLE.compare_exchange(
ptr::null_mut(),
new_table,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => new_table,
Err(old_table) => {
// Free the table we created
unsafe {
Box::from_raw(new_table);
}
old_table
}
}

table
}

// Grow the hash table so that it is big enough for the given number of threads.
Expand Down Expand Up @@ -312,15 +327,18 @@ unsafe fn grow_hashtable(num_threads: usize) {

// Hash function for addresses
#[cfg(target_pointer_width = "32")]
#[inline]
fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B9) >> (32 - bits)
}
#[cfg(target_pointer_width = "64")]
#[inline]
fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
}

// Lock the bucket for the given key
#[inline]
unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
let mut bucket;
loop {
Expand All @@ -345,6 +363,7 @@ unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {

// Lock the bucket for the given key, but check that the key hasn't been changed
// in the meantime due to a requeue.
#[inline]
unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
let mut bucket;
loop {
Expand Down Expand Up @@ -372,6 +391,7 @@ unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
}

// Lock the two buckets for the given pair of keys
#[inline]
unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
let mut bucket1;
loop {
Expand Down Expand Up @@ -412,6 +432,7 @@ unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Buc
}

// Unlock a pair of buckets
#[inline]
unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
if bucket1 as *const _ == bucket2 as *const _ {
bucket1.mutex.unlock();
Expand Down Expand Up @@ -439,6 +460,7 @@ pub enum ParkResult {

impl ParkResult {
/// Returns true if we were unparked by another thread.
#[inline]
pub fn is_unparked(self) -> bool {
if let ParkResult::Unparked(_) = self {
true
Expand Down Expand Up @@ -558,28 +580,6 @@ where
B: FnOnce(),
T: FnOnce(usize, bool),
{
let mut v = Some(validate);
let mut b = Some(before_sleep);
let mut t = Some(timed_out);
park_internal(
key,
&mut || v.take().unchecked_unwrap()(),
&mut || b.take().unchecked_unwrap()(),
&mut |key, was_last_thread| t.take().unchecked_unwrap()(key, was_last_thread),
park_token,
timeout,
)
}

// Non-generic version to reduce monomorphization cost
unsafe fn park_internal(
key: usize,
validate: &mut FnMut() -> bool,
before_sleep: &mut FnMut(),
timed_out: &mut FnMut(usize, bool),
park_token: ParkToken,
timeout: Option<Instant>,
) -> ParkResult {
// Grab our thread data, this also ensures that the hash table exists
with_thread_data(|thread_data| {
// Lock the bucket for the given key
Expand Down Expand Up @@ -706,15 +706,6 @@ pub unsafe fn unpark_one<C>(key: usize, callback: C) -> UnparkResult
where
C: FnOnce(UnparkResult) -> UnparkToken,
{
let mut c = Some(callback);
unpark_one_internal(key, &mut |result| c.take().unchecked_unwrap()(result))
}

// Non-generic version to reduce monomorphization cost
unsafe fn unpark_one_internal(
key: usize,
callback: &mut FnMut(UnparkResult) -> UnparkToken,
) -> UnparkResult {
// Lock the bucket for the given key
let bucket = lock_bucket(key);

Expand Down Expand Up @@ -785,6 +776,7 @@ unsafe fn unpark_one_internal(
/// You should only call this function with an address that you control, since
/// you could otherwise interfere with the operation of other synchronization
/// primitives.
#[inline]
pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
// Lock the bucket for the given key
let bucket = lock_bucket(key);
Expand Down Expand Up @@ -869,23 +861,6 @@ where
V: FnOnce() -> RequeueOp,
C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
{
let mut v = Some(validate);
let mut c = Some(callback);
unpark_requeue_internal(
key_from,
key_to,
&mut || v.take().unchecked_unwrap()(),
&mut |op, r| c.take().unchecked_unwrap()(op, r),
)
}

// Non-generic version to reduce monomorphization cost
unsafe fn unpark_requeue_internal(
key_from: usize,
key_to: usize,
validate: &mut FnMut() -> RequeueOp,
callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken,
) -> UnparkResult {
// Lock the two buckets for the given key
let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);

Expand Down Expand Up @@ -1014,16 +989,6 @@ where
F: FnMut(ParkToken) -> FilterOp,
C: FnOnce(UnparkResult) -> UnparkToken,
{
let mut c = Some(callback);
unpark_filter_internal(key, &mut filter, &mut |r| c.take().unchecked_unwrap()(r))
}

// Non-generic version to reduce monomorphization cost
unsafe fn unpark_filter_internal(
key: usize,
filter: &mut FnMut(ParkToken) -> FilterOp,
callback: &mut FnMut(UnparkResult) -> UnparkToken,
) -> UnparkResult {
// Lock the bucket for the given key
let bucket = lock_bucket(key);

Expand Down
7 changes: 7 additions & 0 deletions core/src/thread_parker/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,29 @@ pub struct ThreadParker {
impl ThreadParker {
pub const IS_CHEAP_TO_CONSTRUCT: bool = true;

#[inline]
pub fn new() -> ThreadParker {
ThreadParker {
parked: AtomicBool::new(false),
}
}

// Prepares the parker. This should be called before adding it to the queue.
#[inline]
pub fn prepare_park(&self) {
self.parked.store(true, Ordering::Relaxed);
}

// Checks if the park timed out. This should be called while holding the
// queue lock after park_until has returned false.
#[inline]
pub fn timed_out(&self) -> bool {
self.parked.load(Ordering::Relaxed) != false
}

// Parks the thread until it is unparked. This should be called after it has
// been added to the queue, after unlocking the queue.
#[inline]
pub fn park(&self) {
while self.parked.load(Ordering::Acquire) != false {
spin_loop_hint();
Expand All @@ -48,6 +52,7 @@ impl ThreadParker {
// Parks the thread until it is unparked or the timeout is reached. This
// should be called after it has been added to the queue, after unlocking
// the queue. Returns true if we were unparked and false if we timed out.
#[inline]
pub fn park_until(&self, timeout: Instant) -> bool {
while self.parked.load(Ordering::Acquire) != false {
if Instant::now() >= timeout {
Expand All @@ -61,6 +66,7 @@ impl ThreadParker {
// Locks the parker to prevent the target thread from exiting. This is
// necessary to ensure that thread-local ThreadData objects remain valid.
// This should be called while holding the queue lock.
#[inline]
pub fn unpark_lock(&self) -> UnparkHandle {
// We don't need to lock anything, just clear the state
self.parked.store(false, Ordering::Release);
Expand All @@ -76,6 +82,7 @@ pub struct UnparkHandle(());
impl UnparkHandle {
// Wakes up the parked thread. This should be called after the queue lock is
// released to avoid blocking the queue for too long.
#[inline]
pub fn unpark(self) {}
}

Expand Down
7 changes: 7 additions & 0 deletions core/src/thread_parker/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,29 @@ pub struct ThreadParker {
impl ThreadParker {
pub const IS_CHEAP_TO_CONSTRUCT: bool = true;

#[inline]
pub fn new() -> ThreadParker {
ThreadParker {
futex: AtomicI32::new(0),
}
}

// Prepares the parker. This should be called before adding it to the queue.
#[inline]
pub fn prepare_park(&self) {
self.futex.store(1, Ordering::Relaxed);
}

// Checks if the park timed out. This should be called while holding the
// queue lock after park_until has returned false.
#[inline]
pub fn timed_out(&self) -> bool {
self.futex.load(Ordering::Relaxed) != 0
}

// Parks the thread until it is unparked. This should be called after it has
// been added to the queue, after unlocking the queue.
#[inline]
pub fn park(&self) {
while self.futex.load(Ordering::Acquire) != 0 {
self.futex_wait(None);
Expand All @@ -60,6 +64,7 @@ impl ThreadParker {
// Parks the thread until it is unparked or the timeout is reached. This
// should be called after it has been added to the queue, after unlocking
// the queue. Returns true if we were unparked and false if we timed out.
#[inline]
pub fn park_until(&self, timeout: Instant) -> bool {
while self.futex.load(Ordering::Acquire) != 0 {
let now = Instant::now();
Expand Down Expand Up @@ -111,6 +116,7 @@ impl ThreadParker {
// Locks the parker to prevent the target thread from exiting. This is
// necessary to ensure that thread-local ThreadData objects remain valid.
// This should be called while holding the queue lock.
#[inline]
pub fn unpark_lock(&self) -> UnparkHandle {
// We don't need to lock anything, just clear the state
self.futex.store(0, Ordering::Release);
Expand All @@ -129,6 +135,7 @@ pub struct UnparkHandle {
impl UnparkHandle {
// Wakes up the parked thread. This should be called after the queue lock is
// released to avoid blocking the queue for too long.
#[inline]
pub fn unpark(self) {
// The thread data may have been freed at this point, but it doesn't
// matter since the syscall will just return EFAULT in that case.
Expand Down
Loading

0 comments on commit 950fee9

Please sign in to comment.