From 9678cece6de806693fc585225c0d6a4a5adaacd3 Mon Sep 17 00:00:00 2001 From: joboet Date: Wed, 22 Jun 2022 16:42:49 +0200 Subject: [PATCH 1/3] std: rewrite SGX thread parker --- library/std/src/sys/sgx/mod.rs | 1 + library/std/src/sys/sgx/thread_parker.rs | 93 +++++++++++++++++++ .../std/src/sys_common/thread_parker/mod.rs | 2 + 3 files changed, 96 insertions(+) create mode 100644 library/std/src/sys/sgx/thread_parker.rs diff --git a/library/std/src/sys/sgx/mod.rs b/library/std/src/sys/sgx/mod.rs index 696400670e04d..65c1d0afe4608 100644 --- a/library/std/src/sys/sgx/mod.rs +++ b/library/std/src/sys/sgx/mod.rs @@ -33,6 +33,7 @@ pub mod process; pub mod stdio; pub mod thread; pub mod thread_local_key; +pub mod thread_parker; pub mod time; mod condvar; diff --git a/library/std/src/sys/sgx/thread_parker.rs b/library/std/src/sys/sgx/thread_parker.rs new file mode 100644 index 0000000000000..f768abddd44dc --- /dev/null +++ b/library/std/src/sys/sgx/thread_parker.rs @@ -0,0 +1,93 @@ +//! Thread parking based on SGX events. + +use super::abi::{thread, usercalls}; +use crate::io::ErrorKind; +use crate::pin::Pin; +use crate::ptr::{self, NonNull}; +use crate::sync::atomic::AtomicPtr; +use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use crate::time::Duration; +use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE}; + +const EMPTY: *mut u8 = ptr::invalid_mut(0); +/// The TCS structure must be page-aligned, so this cannot be a valid pointer +const NOTIFIED: *mut u8 = ptr::invalid_mut(1); + +pub struct Parker { + state: AtomicPtr, +} + +impl Parker { + /// Construct the thread parker. The UNIX parker implementation + /// requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + unsafe { parker.write(Parker::new_internal()) } + } + + pub(super) fn new_internal() -> Parker { + Parker { state: AtomicPtr::new(EMPTY) } + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park(self: Pin<&Self>) { + let tcs = thread::current().as_ptr(); + + if self.state.load(Acquire) != NOTIFIED { + if self.state.compare_exchange(EMPTY, tcs, Acquire, Acquire).is_ok() { + // Loop to guard against spurious wakeups. + loop { + let event = usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap(); + assert!(event & EV_UNPARK == EV_UNPARK); + if self.state.load(Acquire) == NOTIFIED { + break; + } + } + } + } + + // At this point, the token was definately read with acquire ordering, + // so this can be a store. + self.state.store(EMPTY, Relaxed); + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { + let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64; + let tcs = thread::current().as_ptr(); + + if self.state.load(Acquire) != NOTIFIED { + if self.state.compare_exchange(EMPTY, tcs, Acquire, Acquire).is_ok() { + match usercalls::wait(EV_UNPARK, timeout) { + Ok(event) => assert!(event & EV_UNPARK == EV_UNPARK), + Err(e) => { + assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock)) + } + } + + // Swap to provide acquire ordering even if the timeout occurred + // before the token was set. This situation can result in spurious + // wakeups on the next call to `park_timeout`, but it is better to let + // those be handled by the user than do some perhaps unnecessary, but + // always expensive guarding. + self.state.swap(EMPTY, Acquire); + return; + } + } + + // The token was already read with `acquire` ordering, this can be a store. + self.state.store(EMPTY, Relaxed); + } + + // This implementation doesn't require `Pin`, but other implementations do. + pub fn unpark(self: Pin<&Self>) { + let state = self.state.swap(NOTIFIED, Release); + + if !matches!(state, EMPTY | NOTIFIED) { + // There is a thread waiting, wake it up. + let tcs = NonNull::new(state).unwrap(); + // This will fail if the thread has already terminated by the time the signal is send, + // but that is OK. + let _ = usercalls::send(EV_UNPARK, Some(tcs)); + } + } +} diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs index c789a388e05ad..505f26a4001ae 100644 --- a/library/std/src/sys_common/thread_parker/mod.rs +++ b/library/std/src/sys_common/thread_parker/mod.rs @@ -13,6 +13,8 @@ cfg_if::cfg_if! { pub use crate::sys::thread_parker::Parker; } else if #[cfg(target_family = "unix")] { pub use crate::sys::thread_parker::Parker; + } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] { + pub use crate::sys::thread_parker::Parker; } else { mod generic; pub use generic::Parker; From 633d46d0245bf7f60b341c8df8da230c0b689396 Mon Sep 17 00:00:00 2001 From: joboet Date: Wed, 22 Jun 2022 16:44:43 +0200 Subject: [PATCH 2/3] std: reimplement SGX thread joining to use `Parker` --- library/std/src/sys/sgx/thread.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/library/std/src/sys/sgx/thread.rs b/library/std/src/sys/sgx/thread.rs index d745a61961404..579f758c6cc33 100644 --- a/library/std/src/sys/sgx/thread.rs +++ b/library/std/src/sys/sgx/thread.rs @@ -65,39 +65,36 @@ mod task_queue { /// execution. The signal is sent once all TLS destructors have finished at /// which point no new thread locals should be created. pub mod wait_notify { - use super::super::waitqueue::{SpinMutex, WaitQueue, WaitVariable}; + use super::super::thread_parker::Parker; + use crate::pin::Pin; use crate::sync::Arc; - pub struct Notifier(Arc>>); + pub struct Notifier(Arc); impl Notifier { /// Notify the waiter. The waiter is either notified right away (if /// currently blocked in `Waiter::wait()`) or later when it calls the /// `Waiter::wait()` method. pub fn notify(self) { - let mut guard = self.0.lock(); - *guard.lock_var_mut() = true; - let _ = WaitQueue::notify_one(guard); + Pin::new(&*self.0).unpark() } } - pub struct Waiter(Arc>>); + pub struct Waiter(Arc); impl Waiter { /// Wait for a notification. If `Notifier::notify()` has already been /// called, this will return immediately, otherwise the current thread /// is blocked until notified. pub fn wait(self) { - let guard = self.0.lock(); - if *guard.lock_var() { - return; - } - WaitQueue::wait(guard, || {}); + // This is not actually `unsafe`, but it uses the `Parker` API, + // which needs `unsafe` on some platforms. + unsafe { Pin::new(&*self.0).park() } } } pub fn new() -> (Notifier, Waiter) { - let inner = Arc::new(SpinMutex::new(WaitVariable::new(false))); + let inner = Arc::new(Parker::new_internal()); (Notifier(inner.clone()), Waiter(inner)) } } From a40d300100a5e48cb66f5261738496dbacf11f99 Mon Sep 17 00:00:00 2001 From: joboet Date: Mon, 5 Sep 2022 10:19:12 +0200 Subject: [PATCH 3/3] std: clarify semantics of SGX parker --- library/std/src/sys/sgx/thread_parker.rs | 44 ++++++++++++++++-------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/library/std/src/sys/sgx/thread_parker.rs b/library/std/src/sys/sgx/thread_parker.rs index f768abddd44dc..1c55bcffb1e8c 100644 --- a/library/std/src/sys/sgx/thread_parker.rs +++ b/library/std/src/sys/sgx/thread_parker.rs @@ -9,11 +9,17 @@ use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use crate::time::Duration; use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE}; -const EMPTY: *mut u8 = ptr::invalid_mut(0); -/// The TCS structure must be page-aligned, so this cannot be a valid pointer -const NOTIFIED: *mut u8 = ptr::invalid_mut(1); +// The TCS structure must be page-aligned (this is checked by EENTER), so these cannot +// be valid pointers +const EMPTY: *mut u8 = ptr::invalid_mut(1); +const NOTIFIED: *mut u8 = ptr::invalid_mut(2); pub struct Parker { + /// The park state. One of EMPTY, NOTIFIED or a TCS address. + /// A state change to NOTIFIED must be done with release ordering + /// and be observed with acquire ordering so that operations after + /// `thread::park` returns will not occur before the unpark message + /// was sent. state: AtomicPtr, } @@ -30,23 +36,28 @@ impl Parker { // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. pub unsafe fn park(self: Pin<&Self>) { - let tcs = thread::current().as_ptr(); - if self.state.load(Acquire) != NOTIFIED { - if self.state.compare_exchange(EMPTY, tcs, Acquire, Acquire).is_ok() { - // Loop to guard against spurious wakeups. - loop { + let mut prev = EMPTY; + loop { + // Guard against changing TCS addresses by always setting the state to + // the current value. + let tcs = thread::current().as_ptr(); + if self.state.compare_exchange(prev, tcs, Relaxed, Acquire).is_ok() { let event = usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap(); assert!(event & EV_UNPARK == EV_UNPARK); - if self.state.load(Acquire) == NOTIFIED { - break; - } + prev = tcs; + } else { + // The state was definitely changed by another thread at this point. + // The only time this occurs is when the state is changed to NOTIFIED. + // We observed this change with acquire ordering, so we can simply + // change the state to EMPTY with a relaxed store. + break; } } } // At this point, the token was definately read with acquire ordering, - // so this can be a store. + // so this can be a relaxed store. self.state.store(EMPTY, Relaxed); } @@ -56,7 +67,7 @@ impl Parker { let tcs = thread::current().as_ptr(); if self.state.load(Acquire) != NOTIFIED { - if self.state.compare_exchange(EMPTY, tcs, Acquire, Acquire).is_ok() { + if self.state.compare_exchange(EMPTY, tcs, Relaxed, Acquire).is_ok() { match usercalls::wait(EV_UNPARK, timeout) { Ok(event) => assert!(event & EV_UNPARK == EV_UNPARK), Err(e) => { @@ -85,8 +96,11 @@ impl Parker { if !matches!(state, EMPTY | NOTIFIED) { // There is a thread waiting, wake it up. let tcs = NonNull::new(state).unwrap(); - // This will fail if the thread has already terminated by the time the signal is send, - // but that is OK. + // This will fail if the thread has already terminated or its TCS is destroyed + // by the time the signal is sent, but that is fine. If another thread receives + // the same TCS, it will receive this notification as a spurious wakeup, but + // all users of `wait` should and (internally) do guard against those where + // necessary. let _ = usercalls::send(EV_UNPARK, Some(tcs)); } }