diff --git a/library/std/src/sys/sgx/mod.rs b/library/std/src/sys/sgx/mod.rs index 696400670e04d..65c1d0afe4608 100644 --- a/library/std/src/sys/sgx/mod.rs +++ b/library/std/src/sys/sgx/mod.rs @@ -33,6 +33,7 @@ pub mod process; pub mod stdio; pub mod thread; pub mod thread_local_key; +pub mod thread_parker; pub mod time; mod condvar; diff --git a/library/std/src/sys/sgx/thread.rs b/library/std/src/sys/sgx/thread.rs index d745a61961404..579f758c6cc33 100644 --- a/library/std/src/sys/sgx/thread.rs +++ b/library/std/src/sys/sgx/thread.rs @@ -65,39 +65,36 @@ mod task_queue { /// execution. The signal is sent once all TLS destructors have finished at /// which point no new thread locals should be created. pub mod wait_notify { - use super::super::waitqueue::{SpinMutex, WaitQueue, WaitVariable}; + use super::super::thread_parker::Parker; + use crate::pin::Pin; use crate::sync::Arc; - pub struct Notifier(Arc>>); + pub struct Notifier(Arc); impl Notifier { /// Notify the waiter. The waiter is either notified right away (if /// currently blocked in `Waiter::wait()`) or later when it calls the /// `Waiter::wait()` method. pub fn notify(self) { - let mut guard = self.0.lock(); - *guard.lock_var_mut() = true; - let _ = WaitQueue::notify_one(guard); + Pin::new(&*self.0).unpark() } } - pub struct Waiter(Arc>>); + pub struct Waiter(Arc); impl Waiter { /// Wait for a notification. If `Notifier::notify()` has already been /// called, this will return immediately, otherwise the current thread /// is blocked until notified. pub fn wait(self) { - let guard = self.0.lock(); - if *guard.lock_var() { - return; - } - WaitQueue::wait(guard, || {}); + // This is not actually `unsafe`, but it uses the `Parker` API, + // which needs `unsafe` on some platforms. + unsafe { Pin::new(&*self.0).park() } } } pub fn new() -> (Notifier, Waiter) { - let inner = Arc::new(SpinMutex::new(WaitVariable::new(false))); + let inner = Arc::new(Parker::new_internal()); (Notifier(inner.clone()), Waiter(inner)) } } diff --git a/library/std/src/sys/sgx/thread_parker.rs b/library/std/src/sys/sgx/thread_parker.rs new file mode 100644 index 0000000000000..1c55bcffb1e8c --- /dev/null +++ b/library/std/src/sys/sgx/thread_parker.rs @@ -0,0 +1,107 @@ +//! Thread parking based on SGX events. + +use super::abi::{thread, usercalls}; +use crate::io::ErrorKind; +use crate::pin::Pin; +use crate::ptr::{self, NonNull}; +use crate::sync::atomic::AtomicPtr; +use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use crate::time::Duration; +use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE}; + +// The TCS structure must be page-aligned (this is checked by EENTER), so these cannot +// be valid pointers +const EMPTY: *mut u8 = ptr::invalid_mut(1); +const NOTIFIED: *mut u8 = ptr::invalid_mut(2); + +pub struct Parker { + /// The park state. One of EMPTY, NOTIFIED or a TCS address. + /// A state change to NOTIFIED must be done with release ordering + /// and be observed with acquire ordering so that operations after + /// `thread::park` returns will not occur before the unpark message + /// was sent. + state: AtomicPtr, +} + +impl Parker { + /// Construct the thread parker. The UNIX parker implementation + /// requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + unsafe { parker.write(Parker::new_internal()) } + } + + pub(super) fn new_internal() -> Parker { + Parker { state: AtomicPtr::new(EMPTY) } + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park(self: Pin<&Self>) { + if self.state.load(Acquire) != NOTIFIED { + let mut prev = EMPTY; + loop { + // Guard against changing TCS addresses by always setting the state to + // the current value. + let tcs = thread::current().as_ptr(); + if self.state.compare_exchange(prev, tcs, Relaxed, Acquire).is_ok() { + let event = usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap(); + assert!(event & EV_UNPARK == EV_UNPARK); + prev = tcs; + } else { + // The state was definitely changed by another thread at this point. + // The only time this occurs is when the state is changed to NOTIFIED. + // We observed this change with acquire ordering, so we can simply + // change the state to EMPTY with a relaxed store. + break; + } + } + } + + // At this point, the token was definately read with acquire ordering, + // so this can be a relaxed store. + self.state.store(EMPTY, Relaxed); + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { + let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64; + let tcs = thread::current().as_ptr(); + + if self.state.load(Acquire) != NOTIFIED { + if self.state.compare_exchange(EMPTY, tcs, Relaxed, Acquire).is_ok() { + match usercalls::wait(EV_UNPARK, timeout) { + Ok(event) => assert!(event & EV_UNPARK == EV_UNPARK), + Err(e) => { + assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock)) + } + } + + // Swap to provide acquire ordering even if the timeout occurred + // before the token was set. This situation can result in spurious + // wakeups on the next call to `park_timeout`, but it is better to let + // those be handled by the user than do some perhaps unnecessary, but + // always expensive guarding. + self.state.swap(EMPTY, Acquire); + return; + } + } + + // The token was already read with `acquire` ordering, this can be a store. + self.state.store(EMPTY, Relaxed); + } + + // This implementation doesn't require `Pin`, but other implementations do. + pub fn unpark(self: Pin<&Self>) { + let state = self.state.swap(NOTIFIED, Release); + + if !matches!(state, EMPTY | NOTIFIED) { + // There is a thread waiting, wake it up. + let tcs = NonNull::new(state).unwrap(); + // This will fail if the thread has already terminated or its TCS is destroyed + // by the time the signal is sent, but that is fine. If another thread receives + // the same TCS, it will receive this notification as a spurious wakeup, but + // all users of `wait` should and (internally) do guard against those where + // necessary. + let _ = usercalls::send(EV_UNPARK, Some(tcs)); + } + } +} diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs index c789a388e05ad..505f26a4001ae 100644 --- a/library/std/src/sys_common/thread_parker/mod.rs +++ b/library/std/src/sys_common/thread_parker/mod.rs @@ -13,6 +13,8 @@ cfg_if::cfg_if! { pub use crate::sys::thread_parker::Parker; } else if #[cfg(target_family = "unix")] { pub use crate::sys::thread_parker::Parker; + } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] { + pub use crate::sys::thread_parker::Parker; } else { mod generic; pub use generic::Parker;