diff --git a/.github/workflows/nostd.yml b/.github/workflows/nostd.yml new file mode 100644 index 0000000..d94ae5d --- /dev/null +++ b/.github/workflows/nostd.yml @@ -0,0 +1,24 @@ +on: + push: + branches: [main] + pull_request: +name: no-std +jobs: + nostd: + runs-on: ubuntu-latest + name: ${{ matrix.target }} + strategy: + matrix: + target: [thumbv7m-none-eabi, aarch64-unknown-none] + steps: + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + target: ${{ matrix.target }} + - uses: actions/checkout@v2 + - name: cargo check + uses: actions-rs/cargo@v1 + with: + command: check + args: --target ${{ matrix.target }} --no-default-features \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 451e379..1425691 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,5 +14,9 @@ categories = ["concurrency"] [dependencies] slab = "0.4" +[features] +default = ["std"] +std = [] + [target.'cfg(loom)'.dependencies] loom = "0.4.0" diff --git a/src/aliasing.rs b/src/aliasing.rs index 96a5ea7..6d4c225 100644 --- a/src/aliasing.rs +++ b/src/aliasing.rs @@ -128,9 +128,10 @@ //! //! But this warrants repeating: **your `D` types for `Aliased` _must_ be private**. -use std::marker::PhantomData; -use std::mem::MaybeUninit; -use std::ops::Deref; +use alloc::{boxed::Box, string::String, vec::Vec}; +use core::marker::PhantomData; +use core::mem::MaybeUninit; +use core::ops::Deref; // Just to make the doc comment linking work. #[allow(unused_imports)] @@ -183,7 +184,7 @@ where // a) the T is behind a MaybeUninit, and so will cannot be accessed safely; and // b) we only expose _either_ &T while aliased, or &mut after the aliasing ends. Aliased { - aliased: std::ptr::read(&self.aliased), + aliased: core::ptr::read(&self.aliased), drop_behavior: PhantomData, _no_auto_send: PhantomData, } @@ -211,7 +212,7 @@ where pub unsafe fn change_drop(self) -> Aliased { Aliased { // safety: - aliased: std::ptr::read(&self.aliased), + aliased: core::ptr::read(&self.aliased), drop_behavior: PhantomData, _no_auto_send: PhantomData, } @@ -247,7 +248,7 @@ where // That T has not been dropped (getting a Aliased is unsafe). // T is no longer aliased (by the safety assumption of getting a Aliased), // so we are allowed to re-take ownership of the T. - unsafe { std::ptr::drop_in_place(self.aliased.as_mut_ptr()) } + unsafe { core::ptr::drop_in_place(self.aliased.as_mut_ptr()) } } } } @@ -276,7 +277,7 @@ where } } -use std::hash::{Hash, Hasher}; +use core::hash::{Hash, Hasher}; impl Hash for Aliased where D: DropBehavior, @@ -290,7 +291,7 @@ where } } -use std::fmt; +use core::fmt; impl fmt::Debug for Aliased where D: DropBehavior, @@ -323,7 +324,7 @@ where D: DropBehavior, T: PartialOrd, { - fn partial_cmp(&self, other: &Self) -> Option { + fn partial_cmp(&self, other: &Self) -> Option { self.as_ref().partial_cmp(other.as_ref()) } @@ -349,12 +350,12 @@ where D: DropBehavior, T: Ord, { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { self.as_ref().cmp(other.as_ref()) } } -use std::borrow::Borrow; +use core::borrow::Borrow; impl Borrow for Aliased where D: DropBehavior, @@ -385,6 +386,8 @@ where self.as_ref() } } + +#[cfg(feature = "std")] impl Borrow for Aliased where D: DropBehavior, @@ -410,7 +413,7 @@ where self.as_ref() } } -impl Borrow for Aliased, D> +impl Borrow for Aliased, D> where T: ?Sized, D: DropBehavior, @@ -419,7 +422,7 @@ where self.as_ref() } } -impl Borrow for Aliased, D> +impl Borrow for Aliased, D> where T: ?Sized, D: DropBehavior, diff --git a/src/handle_list.rs b/src/handle_list.rs new file mode 100644 index 0000000..3052c56 --- /dev/null +++ b/src/handle_list.rs @@ -0,0 +1,351 @@ +use core::{ + fmt::{Debug, Formatter}, + marker::PhantomData, + sync::atomic::AtomicBool, +}; + +use crate::sync::{Arc, AtomicPtr, AtomicUsize, Ordering}; +use alloc::boxed::Box; + +// TODO +// * For now I'm just using Ordering::SeqCst, because I havent really looked into what exactly we +// need for the Ordering, so this should probably be made more accurate in the Future + +/// A Lock-Free List of Handles +pub struct HandleList { + inner: Arc, +} + +struct InnerList { + // The Head of the List + head: AtomicPtr, +} + +/// A Snapshot of the HandleList +/// +/// Iterating over this Snapshot only yields the Entries that were present when this Snapshot was taken +pub struct ListSnapshot { + // The Head-Ptr at the time of creation + head: *const ListEntry, + + // This entry exists to make sure that we keep the inner List alive and it wont be freed from under us + _list: Arc, +} + +/// An Iterator over the Entries in a Snapshot +pub struct SnapshotIter<'s> { + // A Pointer to the next Entry that will be yielded + current: *const ListEntry, + _marker: PhantomData<&'s ()>, +} + +struct ListEntry { + data: Arc, + used: AtomicBool, + // Stores the number of following entries in the list + followers: usize, + // We can use a normal Ptr here because we never append or remove Entries and only add new Entries + // by changing the Head, so we never modify this Ptr and therefore dont need an AtomicPtr + next: *const Self, +} + +/// The EntryHandle is needed to allow for reuse of entries, after a handle is dropped +#[derive(Debug)] +pub struct EntryHandle { + counter: Arc, + elem: *const ListEntry, +} + +impl HandleList { + /// Creates a new empty HandleList + pub fn new() -> Self { + Self { + inner: Arc::new(InnerList { + head: AtomicPtr::new(core::ptr::null_mut()), + }), + } + } + + fn len(&self) -> usize { + let head = self.inner.head.load(Ordering::SeqCst); + if head.is_null() { + return 0; + } + + // Safety + // The prt is not null and as entries are never deallocated, so the ptr should always be + // valid + unsafe { (*head).followers + 1 } + } + + /// Obtains a new Entry + pub fn get_entry(&self) -> EntryHandle { + if let Some(entry) = self.try_acquire() { + return entry; + } + + self.new_entry() + } + + fn try_acquire(&self) -> Option { + let mut current: *const ListEntry = self.inner.head.load(Ordering::SeqCst); + while !current.is_null() { + // Safety + // The ptr is not null and entries are never deallocated + let current_entry = unsafe { &*current }; + + if !current_entry.used.load(Ordering::SeqCst) { + if current_entry + .used + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + return Some(EntryHandle { + counter: current_entry.data.clone(), + elem: current, + }); + } + } + + current = current_entry.next; + } + + None + } + + /// Adds a new Entry to the List and returns the Counter for the Entry + fn new_entry(&self) -> EntryHandle { + let count = Arc::new(AtomicUsize::new(0)); + + self.add_counter(count) + } + + /// Adds a new Counter to the List of Entries, increasing the size of the List + fn add_counter(&self, count: Arc) -> EntryHandle { + let counter = count.clone(); + + let n_node = Box::new(ListEntry { + data: count, + used: AtomicBool::new(true), + followers: 0, + next: core::ptr::null(), + }); + let n_node_ptr = Box::into_raw(n_node); + + let mut current_head = self.inner.head.load(Ordering::SeqCst); + loop { + // Safety + // This is save, because we have not stored the Ptr elsewhere so we have exclusive + // access. + // The Ptr is also still valid, because we never free Entries on the List + unsafe { (*n_node_ptr).next = current_head }; + + // Update the follower count of the new entry + if !current_head.is_null() { + // Safety + // This is save, because we know the Ptr is not null and we know that + // Entries will never be deallocated, so the ptr still refers to a valid + // entry. + unsafe { (*n_node_ptr).followers = (*current_head).followers + 1 }; + } + + // Attempt to add the Entry to the List by setting it as the new Head + match self.inner.head.compare_exchange( + current_head, + n_node_ptr, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => { + return EntryHandle { + counter, + elem: n_node_ptr, + } + } + Err(n_head) => { + // Store the found Head-Ptr to avoid an extra load at the start of every loop + current_head = n_head; + } + } + } + } + + /// Creates a new Snapshot of the List at this Point in Time + pub fn snapshot(&self) -> ListSnapshot { + ListSnapshot { + head: self.inner.head.load(Ordering::SeqCst), + _list: self.inner.clone(), + } + } + + /// Inserts the Items of the Iterator, but in reverse order + #[cfg(test)] + pub fn extend(&self, iter: I) + where + I: IntoIterator>, + { + for item in iter.into_iter() { + self.add_counter(item); + } + } +} + +impl Default for HandleList { + fn default() -> Self { + Self::new() + } +} +impl Debug for HandleList { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + // TODO + // Figure out how exactly we want the Debug output to look + write!(f, "HandleList") + } +} +impl Clone for HandleList { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl ListSnapshot { + /// Obtain an iterator over the Entries in this Snapshot + pub fn iter(&self) -> SnapshotIter<'_> { + SnapshotIter { + current: self.head, + _marker: PhantomData {}, + } + } + + /// Get the Length of the current List of entries + pub fn len(&self) -> usize { + if self.head.is_null() { + return 0; + } + + unsafe { (*self.head).followers + 1 } + } +} + +impl<'s> Iterator for SnapshotIter<'s> { + // TODO + // Maybe don't return an owned Value here + type Item = &'s AtomicUsize; + + fn next(&mut self) -> Option { + if self.current.is_null() { + return None; + } + + // # Safety + // The Ptr is not null, because of the previous if-statement. + // + // The Data is also not freed yet, because we know that at least one snapshot is still + // alive (because we bind to it through the lifetime) and as long as at least one + // snapshot exists, the InnerList will not be freed or dropped. This means that the entries + // in the List are also not yet freed and therefore its safe to still access them + let entry = unsafe { &*self.current }; + + self.current = entry.next; + + Some(&entry.data) + } +} + +impl Drop for InnerList { + fn drop(&mut self) { + // We iterate over all the Entries of the List and free every Entry of the List + let mut current = *self.head.get_mut(); + while !current.is_null() { + // # Safety + // This is safe, because we only enter the loop body if the Pointer is not null and we + // also know that the Entry is not yet freed because we only free them once we are dropped + // and because we are now in Drop, noone before us has freed any Entry on the List + let next = unsafe { &*current }.next as *mut ListEntry; + + // # Safety + // 1. We know that the Pointer was allocated using Box::new + // 2. We are the only ones to convert it back into a Box again, because we only ever do + // this when the InnerList is dropped (now) and then also free all the Entries so there + // is no chance of one entry surviving or still being stored somewhere for later use. + // 3. There is also no other reference to the Element, because otherwise the InnerList + // could not be dropped and we would not be in this section + let entry = unsafe { Box::from_raw(current) }; + drop(entry); + + current = next; + } + } +} + +impl EntryHandle { + pub fn counter(&self) -> &AtomicUsize { + &self.counter + } +} +impl Drop for EntryHandle { + fn drop(&mut self) { + let elem = unsafe { &*self.elem }; + elem.used.store(false, Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_list() { + let list = HandleList::new(); + drop(list); + } + + #[test] + fn empty_snapshot() { + let list = HandleList::new(); + + let snapshot = list.snapshot(); + + // Assert that the Iterator over the Snapshot is empty + assert_eq!(0, snapshot.iter().count()); + } + + #[test] + fn snapshots_and_entries() { + let list = HandleList::new(); + + let empty_snapshot = list.snapshot(); + assert_eq!(0, empty_snapshot.iter().count()); + + let entry = list.new_entry(); + entry.counter().store(1, Ordering::SeqCst); + + // Make sure that the Snapshot we got before adding a new Entry is still empty + assert_eq!(0, empty_snapshot.iter().count()); + + let second_snapshot = list.snapshot(); + assert_eq!(1, second_snapshot.iter().count()); + + let snapshot_entry = second_snapshot.iter().next().unwrap(); + assert_eq!( + entry.counter().load(Ordering::SeqCst), + snapshot_entry.load(Ordering::SeqCst) + ); + } + + #[test] + fn entry_reuse() { + let list = HandleList::new(); + + assert_eq!(0, list.len()); + + let entry1 = list.get_entry(); + assert_eq!(1, list.len()); + drop(entry1); + + let entry2 = list.get_entry(); + assert_eq!(1, list.len()); + } +} diff --git a/src/lib.rs b/src/lib.rs index e535198..3859083 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -166,6 +166,7 @@ //! closure instead. Instead, consider using [`ReadGuard::map`] and [`ReadGuard::try_map`], which //! (like `RefCell`'s [`Ref::map`](std::cell::Ref::map)) allow you to provide a guarded reference //! deeper into your data structure. +#![cfg_attr(not(feature = "std"), no_std)] #![warn( missing_docs, rust_2018_idioms, @@ -174,11 +175,14 @@ )] #![allow(clippy::type_complexity)] +// Needed for no_std support +extern crate alloc; + mod sync; -use crate::sync::{Arc, AtomicUsize, Mutex}; +use alloc::boxed::Box; -type Epochs = Arc>>>; +type Epochs = handle_list::HandleList; mod write; pub use crate::write::Taken; @@ -187,6 +191,8 @@ pub use crate::write::WriteHandle; mod read; pub use crate::read::{ReadGuard, ReadHandle, ReadHandleFactory}; +mod handle_list; + pub mod aliasing; /// Types that can incorporate operations of type `O`. @@ -263,20 +269,47 @@ pub trait Absorb { fn sync_with(&mut self, first: &Self); } +/// Construct a new write and read handle pair from an empty data structure with +/// [`yield_now`](std::thread::yield_now) as the yield function. +/// +/// See [`new_from_empty_with_yield`] for a more detailed explanation +#[cfg(feature = "std")] +pub fn new_from_empty(t: T) -> (WriteHandle, ReadHandle) +where + T: Absorb + Clone, +{ + new_from_empty_with_yield(t, std::thread::yield_now) +} + /// Construct a new write and read handle pair from an empty data structure. /// /// The type must implement `Clone` so we can construct the second copy from the first. -pub fn new_from_empty(t: T) -> (WriteHandle, ReadHandle) +/// +/// The `yield_fn` will be called by the writer while its waiting for all the readers to move on +/// after a refresh. This allows for more efficient waiting instead of simply spinning. +pub fn new_from_empty_with_yield(t: T, yield_fn: fn()) -> (WriteHandle, ReadHandle) where T: Absorb + Clone, { - let epochs = Default::default(); + let epochs: Epochs = Default::default(); - let r = ReadHandle::new(t.clone(), Arc::clone(&epochs)); - let w = WriteHandle::new(t, epochs, r.clone()); + let r = ReadHandle::new(t.clone(), epochs.clone()); + let w = WriteHandle::new_with_yield(t, epochs, r.clone(), yield_fn); (w, r) } +/// Construct a new write and read handle pair from the data structure default, with +/// [`yield_now`](std::thread::yield_now) as the yield function. +/// +/// See [`new_with_yield`] for a more detailed explanation. +#[cfg(feature = "std")] +pub fn new() -> (WriteHandle, ReadHandle) +where + T: Absorb + Default, +{ + new_with_yield(std::thread::yield_now) +} + /// Construct a new write and read handle pair from the data structure default. /// /// The type must implement `Default` so we can construct two empty instances. You must ensure that @@ -287,13 +320,16 @@ where /// /// If your type's `Default` implementation does not guarantee this, you can use `new_from_empty`, /// which relies on `Clone` instead of `Default`. -pub fn new() -> (WriteHandle, ReadHandle) +/// +/// The `yield_fn` will be called by the writer while its waiting for all the readers to move on +/// after a refresh. This allows for more efficient waiting instead of simply spinning. +pub fn new_with_yield(yield_fn: fn()) -> (WriteHandle, ReadHandle) where T: Absorb + Default, { - let epochs = Default::default(); + let epochs: Epochs = Default::default(); - let r = ReadHandle::new(T::default(), Arc::clone(&epochs)); - let w = WriteHandle::new(T::default(), epochs, r.clone()); + let r = ReadHandle::new(T::default(), epochs.clone()); + let w = WriteHandle::new_with_yield(T::default(), epochs, r.clone(), yield_fn); (w, r) } diff --git a/src/read.rs b/src/read.rs index d38784f..fefbcfe 100644 --- a/src/read.rs +++ b/src/read.rs @@ -1,8 +1,9 @@ use crate::sync::{fence, Arc, AtomicPtr, AtomicUsize, Ordering}; -use std::cell::Cell; -use std::fmt; -use std::marker::PhantomData; -use std::ptr::NonNull; +use alloc::boxed::Box; +use core::cell::Cell; +use core::fmt; +use core::marker::PhantomData; +use core::ptr::NonNull; // To make [`WriteHandle`] and friends work. #[cfg(doc)] @@ -39,8 +40,8 @@ pub use factory::ReadHandleFactory; pub struct ReadHandle { pub(crate) inner: Arc>, pub(crate) epochs: crate::Epochs, - epoch: Arc, - epoch_i: usize, + // epoch: Arc, + epoch: crate::handle_list::EntryHandle, enters: Cell, // `ReadHandle` is _only_ Send if T is Sync. If T is !Sync, then it's not okay for us to expose @@ -53,11 +54,8 @@ unsafe impl Send for ReadHandle where T: Sync {} impl Drop for ReadHandle { fn drop(&mut self) { - // epoch must already be even for us to have &mut self, - // so okay to lock since we're not holding up the epoch anyway. - let e = self.epochs.lock().unwrap().remove(self.epoch_i); - assert!(Arc::ptr_eq(&e, &self.epoch)); - assert_eq!(self.enters.get(), 0); + // We dont need a Drop implementation as of now, because the Epoch for this Handle will not + // be freed again } } @@ -65,14 +63,14 @@ impl fmt::Debug for ReadHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReadHandle") .field("epochs", &self.epochs) - .field("epoch", &self.epoch) + .field("epoch", &self.epoch.counter()) .finish() } } impl Clone for ReadHandle { fn clone(&self) -> Self { - ReadHandle::new_with_arc(Arc::clone(&self.inner), Arc::clone(&self.epochs)) + ReadHandle::new_with_arc(Arc::clone(&self.inner), self.epochs.clone()) } } @@ -84,15 +82,12 @@ impl ReadHandle { } fn new_with_arc(inner: Arc>, epochs: crate::Epochs) -> Self { - // tell writer about our epoch tracker - let epoch = Arc::new(AtomicUsize::new(0)); - // okay to lock, since we're not holding up the epoch - let epoch_i = epochs.lock().unwrap().insert(Arc::clone(&epoch)); + // Obtain a new Epoch-Entry + let epoch = epochs.get_entry(); Self { epochs, epoch, - epoch_i, enters: Cell::new(0), inner, _unimpl_send: PhantomData, @@ -104,7 +99,7 @@ impl ReadHandle { pub fn factory(&self) -> ReadHandleFactory { ReadHandleFactory { inner: Arc::clone(&self.inner), - epochs: Arc::clone(&self.epochs), + epochs: self.epochs.clone(), } } } @@ -165,7 +160,7 @@ impl ReadHandle { // in all cases, using a pointer we read *after* updating our epoch is safe. // so, update our epoch tracker. - self.epoch.fetch_add(1, Ordering::AcqRel); + self.epoch.counter().fetch_add(1, Ordering::AcqRel); // ensure that the pointer read happens strictly after updating the epoch fence(Ordering::SeqCst); @@ -187,7 +182,7 @@ impl ReadHandle { } else { // the writehandle has been dropped, and so has both copies, // so restore parity and return None - self.epoch.fetch_add(1, Ordering::AcqRel); + self.epoch.counter().fetch_add(1, Ordering::AcqRel); None } } diff --git a/src/read/factory.rs b/src/read/factory.rs index 147e6d2..3db8471 100644 --- a/src/read/factory.rs +++ b/src/read/factory.rs @@ -1,6 +1,6 @@ use super::ReadHandle; use crate::sync::{Arc, AtomicPtr}; -use std::fmt; +use core::fmt; /// A type that is both `Sync` and `Send` and lets you produce new [`ReadHandle`] instances. /// @@ -25,7 +25,7 @@ impl Clone for ReadHandleFactory { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), - epochs: Arc::clone(&self.epochs), + epochs: self.epochs.clone(), } } } @@ -34,6 +34,6 @@ impl ReadHandleFactory { /// Produce a new [`ReadHandle`] to the same left-right data structure as this factory was /// originally produced from. pub fn handle(&self) -> ReadHandle { - ReadHandle::new_with_arc(Arc::clone(&self.inner), Arc::clone(&self.epochs)) + ReadHandle::new_with_arc(Arc::clone(&self.inner), self.epochs.clone()) } } diff --git a/src/read/guard.rs b/src/read/guard.rs index 308ef62..f268499 100644 --- a/src/read/guard.rs +++ b/src/read/guard.rs @@ -1,10 +1,11 @@ use crate::sync::{AtomicUsize, Ordering}; -use std::cell::Cell; -use std::mem; +use core::cell::Cell; +use core::mem; #[derive(Debug, Copy, Clone)] pub(super) struct ReadHandleState<'rh> { - pub(super) epoch: &'rh AtomicUsize, + // pub(super) epoch: &'rh AtomicUsize, + pub(super) epoch: &'rh crate::handle_list::EntryHandle, pub(super) enters: &'rh Cell, } @@ -107,7 +108,7 @@ impl<'rh, T: ?Sized> AsRef for ReadGuard<'rh, T> { } } -impl<'rh, T: ?Sized> std::ops::Deref for ReadGuard<'rh, T> { +impl<'rh, T: ?Sized> core::ops::Deref for ReadGuard<'rh, T> { type Target = T; fn deref(&self) -> &Self::Target { self.t @@ -120,7 +121,7 @@ impl<'rh, T: ?Sized> Drop for ReadGuard<'rh, T> { self.handle.enters.set(enters); if enters == 0 { // We are the last guard to be dropped -- now release our epoch. - self.handle.epoch.fetch_add(1, Ordering::AcqRel); + self.handle.epoch.counter().fetch_add(1, Ordering::AcqRel); } } } diff --git a/src/sync.rs b/src/sync.rs index 3a87edf..c5631bf 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -17,6 +17,6 @@ pub(crate) fn fence(ord: Ordering) { } #[cfg(not(loom))] -pub(crate) use std::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; +pub(crate) use alloc::sync::Arc; #[cfg(not(loom))] -pub(crate) use std::sync::{Arc, Mutex, MutexGuard}; +pub(crate) use core::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; diff --git a/src/utilities.rs b/src/utilities.rs index 88520e8..29420f3 100644 --- a/src/utilities.rs +++ b/src/utilities.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +use alloc::boxed::Box; + #[cfg(test)] #[derive(Debug)] pub struct CounterAddOp(pub i32); diff --git a/src/write.rs b/src/write.rs index 747367e..7a9f098 100644 --- a/src/write.rs +++ b/src/write.rs @@ -1,14 +1,15 @@ +use crate::handle_list::ListSnapshot; use crate::read::ReadHandle; use crate::Absorb; -use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering}; -use std::collections::VecDeque; -use std::marker::PhantomData; -use std::ops::DerefMut; -use std::ptr::NonNull; +use crate::sync::{fence, Arc, Ordering}; +use alloc::{boxed::Box, collections::VecDeque, vec::Vec}; +use core::fmt; +use core::marker::PhantomData; +use core::ops::DerefMut; +use core::ptr::NonNull; #[cfg(test)] -use std::sync::atomic::AtomicBool; -use std::{fmt, thread}; +use core::sync::atomic::AtomicBool; /// A writer handle to a left-right guarded data structure. /// @@ -42,6 +43,9 @@ where second: bool, /// If we call `Self::take` the drop needs to be different. taken: bool, + /// This function will be used to yield the current execution instead of just spinning while + /// waiting for all readers to move on + yield_fn: fn(), } // safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take @@ -83,7 +87,7 @@ pub struct Taken, O> { _marker: PhantomData, } -impl + std::fmt::Debug, O> std::fmt::Debug for Taken { +impl + core::fmt::Debug, O> core::fmt::Debug for Taken { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Taken") .field( @@ -145,7 +149,7 @@ where /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and /// returns the other copy as a [`Taken`] smart pointer. fn take_inner(&mut self) -> Option> { - use std::ptr; + use core::ptr; // Can only take inner once. if self.taken { return None; @@ -168,9 +172,8 @@ where let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release); // now, wait for all readers to depart - let epochs = Arc::clone(&self.epochs); - let mut epochs = epochs.lock().unwrap(); - self.wait(&mut epochs); + let epoch_snapshot = self.epochs.snapshot(); + self.wait(&epoch_snapshot); // ensure that the subsequent epoch reads aren't re-ordered to before the swap fence(Ordering::SeqCst); @@ -212,7 +215,12 @@ impl WriteHandle where T: Absorb, { - pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle) -> Self { + pub(crate) fn new_with_yield( + w_handle: T, + epochs: crate::Epochs, + r_handle: ReadHandle, + yield_fn: fn(), + ) -> Self { Self { epochs, // safety: Box is not null and covariant. @@ -228,10 +236,11 @@ where first: true, second: true, taken: false, + yield_fn, } } - fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab>>) { + fn wait(&mut self, epochs: &ListSnapshot) { let mut iter = 0; let mut starti = 0; @@ -239,11 +248,13 @@ where { self.is_waiting.store(true, Ordering::Relaxed); } - // we're over-estimating here, but slab doesn't expose its max index - self.last_epochs.resize(epochs.capacity(), 0); + + // make sure we have enough space for all the epochs in the current snapshot + self.last_epochs.resize(epochs.len(), 0); + 'retry: loop { // read all and see if all have changed (which is likely) - for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) { + for (ii, (ri, epoch)) in epochs.iter().enumerate().enumerate().skip(starti) { // if the reader's epoch was even last we read it (which was _after_ the swap), // then they either do not have the pointer, or must have read the pointer strictly // after the swap. in either case, they cannot be using the old pointer value (what @@ -275,7 +286,7 @@ where if iter != 20 { iter += 1; } else { - thread::yield_now(); + (self.yield_fn)(); } } @@ -304,13 +315,12 @@ where // flag has been observed to be on for two subsequent iterations (there still may be some // readers present since we did the previous refresh) // - // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will - // only block on pre-existing readers, and they are never waiting to push onto epochs - // unless they have finished reading. - let epochs = Arc::clone(&self.epochs); - let mut epochs = epochs.lock().unwrap(); - - self.wait(&mut epochs); + // NOTE: + // Here we take a Snapshot of the currently existing Readers and only consider these + // as all new readers will already be on the other copy, so there is no need to wait for + // them + let epoch_snapshot = self.epochs.snapshot(); + self.wait(&epoch_snapshot); if !self.first { // all the readers have left! @@ -375,7 +385,7 @@ where // ensure that the subsequent epoch reads aren't re-ordered to before the swap fence(Ordering::SeqCst); - for (ri, epoch) in epochs.iter() { + for (ri, epoch) in epoch_snapshot.iter().enumerate() { self.last_epochs[ri] = epoch.load(Ordering::Acquire); } @@ -409,7 +419,7 @@ where /// /// Its effects will not be exposed to readers until you call [`publish`](Self::publish). pub fn append(&mut self, op: O) -> &mut Self { - self.extend(std::iter::once(op)); + self.extend(core::iter::once(op)); self } @@ -442,7 +452,7 @@ where } // allow using write handle for reads -use std::ops::Deref; +use core::ops::Deref; impl Deref for WriteHandle where T: Absorb, @@ -558,14 +568,17 @@ struct CheckWriteHandleSend; #[cfg(test)] mod tests { - use crate::sync::{AtomicUsize, Mutex, Ordering}; + + use crate::sync::{AtomicUsize, Ordering}; use crate::Absorb; use slab::Slab; include!("./utilities.rs"); + fn test_yield() {} + #[test] fn append_test() { - let (mut w, _r) = crate::new::(); + let (mut w, _r) = crate::new_with_yield::(test_yield); assert_eq!(w.first, true); w.append(CounterAddOp(1)); assert_eq!(w.oplog.len(), 0); @@ -580,7 +593,7 @@ mod tests { #[test] fn take_test() { // publish twice then take with no pending operations - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); w.publish(); w.append(CounterAddOp(1)); @@ -588,7 +601,7 @@ mod tests { assert_eq!(*w.take(), 4); // publish twice then pending operation published by take - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); w.publish(); w.append(CounterAddOp(1)); @@ -597,29 +610,30 @@ mod tests { assert_eq!(*w.take(), 6); // normal publish then pending operations published by take - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); w.publish(); w.append(CounterAddOp(1)); assert_eq!(*w.take(), 4); // pending operations published by take - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); assert_eq!(*w.take(), 3); // emptry op queue - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); w.publish(); assert_eq!(*w.take(), 3); // no operations - let (w, _r) = crate::new_from_empty::(2); + let (w, _r) = crate::new_from_empty_with_yield::(2, test_yield); assert_eq!(*w.take(), 2); } #[test] + #[cfg(feature = "std")] fn wait_test() { use std::sync::{Arc, Barrier}; use std::thread; @@ -627,9 +641,9 @@ mod tests { // Case 1: If epoch is set to default. let test_epochs: crate::Epochs = Default::default(); - let mut test_epochs = test_epochs.lock().unwrap(); + let test_snapshot = test_epochs.snapshot(); // since there is no epoch to waiting for, wait function will return immediately. - w.wait(&mut test_epochs); + w.wait(&test_snapshot); // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch) // and wait has been called. @@ -650,11 +664,15 @@ mod tests { assert_eq!(false, is_waiting_v); let barrier2 = Arc::clone(&barrier); - let test_epochs = Arc::new(Mutex::new(epochs_slab)); + let test_epochs: crate::Epochs = Default::default(); + // We need to reverse the iterator here because when using `extend` it inserts the Items in reverse + test_epochs.extend(epochs_slab.into_iter().map(|(_, tmp)| tmp).rev()); + assert_eq!(3, test_epochs.snapshot().iter().count()); + let wait_handle = thread::spawn(move || { barrier2.wait(); - let mut test_epochs = test_epochs.lock().unwrap(); - w.wait(&mut test_epochs); + let test_epochs_snapshot = test_epochs.snapshot(); + w.wait(&test_epochs_snapshot); }); barrier.wait(); @@ -673,7 +691,7 @@ mod tests { #[test] fn flush_noblock() { - let (mut w, r) = crate::new::(); + let (mut w, r) = crate::new_with_yield::(test_yield); w.append(CounterAddOp(42)); w.publish(); assert_eq!(*r.enter().unwrap(), 42); @@ -687,7 +705,7 @@ mod tests { #[test] fn flush_no_refresh() { - let (mut w, _) = crate::new::(); + let (mut w, _) = crate::new_with_yield::(test_yield); // Until we refresh, writes are written directly instead of going to the // oplog (because there can't be any readers on the w_handle table).