diff --git a/src/intrusive.rs b/src/intrusive.rs index 69f460b..9d6fb45 100644 --- a/src/intrusive.rs +++ b/src/intrusive.rs @@ -6,7 +6,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::Ordering; use crate::sync::cell::{Cell, UnsafeCell}; -use crate::{RegisterResult, State, TaskRef}; +use crate::{QueueStrategy, RegisterResult, State, TaskRef}; #[cfg(feature = "critical-section")] use core::cell::RefCell; @@ -42,17 +42,21 @@ struct Inner { /// The number of notified listeners. notified: usize, + + /// Strategy by which the list is organized. + strategy: QueueStrategy, } impl List { /// Create a new, empty event listener list. - pub(super) fn new() -> Self { + pub(super) fn new(strategy: QueueStrategy) -> Self { let inner = Inner { head: None, tail: None, next: None, len: 0, notified: 0, + strategy, }; #[cfg(feature = "critical-section")] @@ -149,39 +153,9 @@ impl crate::Inner { }) } - /// Add a new listener to the list. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { - self.with_inner(|inner| { - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); - - { - let entry_guard = listener.link.get(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; - - // Replace the tail with the new entry. - match mem::replace(&mut inner.tail, Some(entry.into())) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, - }; - } - - // If there are no unnotified entries, this is the first one. - if inner.next.is_none() { - inner.next = inner.tail; - } - - // Bump the entry count. - inner.len += 1; - }); + /// Adds a listener to the list. + pub(crate) fn insert(&self, listener: Pin<&mut Option>>) { + self.with_inner(|inner| inner.insert(listener)) } /// Remove a listener from the list. @@ -248,6 +222,53 @@ impl crate::Inner { } impl Inner { + fn insert(&mut self, mut listener: Pin<&mut Option>>) { + use QueueStrategy::{Fifo, Lifo}; + + listener.as_mut().set(Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(self.tail.filter(|_| self.strategy == Fifo)), + next: Cell::new(self.head.filter(|_| self.strategy == Lifo)), + }), + _pin: PhantomPinned, + })); + let listener = listener.as_pin_mut().unwrap(); + + { + let entry_guard = listener.link.get(); + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; + + // Replace the head or tail with the new entry. + let replacing = match self.strategy { + Lifo => &mut self.head, + Fifo => &mut self.tail, + }; + + match mem::replace(replacing, Some(entry.into())) { + None => *replacing = Some(entry.into()), + Some(t) if self.strategy == Lifo => unsafe { + t.as_ref().prev.set(Some(entry.into())) + }, + Some(t) if self.strategy == Fifo => unsafe { + t.as_ref().next.set(Some(entry.into())) + }, + Some(_) => unimplemented!("unimplemented queue strategy"), + }; + } + + // If there are no unnotified entries, or if using LIFO strategy, this is the first one. + if self.strategy == Lifo { + self.next = self.head; + } else if self.next.is_none() { + self.next = self.tail; + } + + // Bump the entry count. + self.len += 1; + } + fn remove( &mut self, mut listener: Pin<&mut Option>>, @@ -413,7 +434,7 @@ mod tests { #[test] fn insert() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. @@ -434,7 +455,7 @@ mod tests { #[test] fn drop_non_notified() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. diff --git a/src/lib.rs b/src/lib.rs index d6a8e44..9bf5d7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,6 +127,16 @@ use sync::WithMut; use notify::NotificationPrivate; pub use notify::{IntoNotification, Notification}; +/// Queuing strategy for listeners. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum QueueStrategy { + /// First-in-first-out listeners are added to the back of the list. + Fifo, + + /// Last-in-first-out listeners are added to the front of the list. + Lifo, +} + /// Inner state of [`Event`]. struct Inner { /// The number of notified entries, or `usize::MAX` if all of them have been notified. @@ -143,10 +153,10 @@ struct Inner { } impl Inner { - fn new() -> Self { + fn new(queue_strategy: QueueStrategy) -> Self { Self { notified: AtomicUsize::new(usize::MAX), - list: sys::List::new(), + list: sys::List::new(queue_strategy), } } } @@ -177,6 +187,11 @@ pub struct Event { /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s /// reference count. inner: AtomicPtr>, + + /// Queuing strategy. + /// + /// Listeners waiting for notification will be arranged according to the strategy. + queue_strategy: QueueStrategy, } unsafe impl Send for Event {} @@ -238,6 +253,7 @@ impl Event { pub const fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy: QueueStrategy::Fifo, } } #[cfg(all(feature = "std", loom))] @@ -245,6 +261,7 @@ impl Event { pub fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy: QueueStrategy::Fifo, } } @@ -471,7 +488,7 @@ impl Event { // If this is the first use, initialize the state. if inner.is_null() { // Allocate the state on the heap. - let new = Arc::new(Inner::::new()); + let new = Arc::new(Inner::::new(self.queue_strategy)); // Convert the state to a raw pointer. let new = Arc::into_raw(new) as *mut Inner; @@ -556,16 +573,39 @@ impl Event<()> { #[inline] #[cfg(not(loom))] pub const fn new() -> Self { + Self::new_with_queue_strategy(QueueStrategy::Fifo) + } + + #[inline] + #[cfg(loom)] + pub fn new() -> Self { + Self::new_with_queue_strategy(QueueStrategy::Fifo) + } + + /// Creates a new [`Event`] with specific queue strategy. + /// + /// # Examples + /// + /// ``` + /// use event_listener::{Event, QueueStrategy}; + /// + /// let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); + /// ``` + #[inline] + #[cfg(not(loom))] + pub const fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy, } } #[inline] #[cfg(loom)] - pub fn new() -> Self { + pub fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy, } } diff --git a/src/slab.rs b/src/slab.rs index 59e1c21..11e9a12 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -18,7 +18,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::cell::{Cell, ConstPtr, UnsafeCell}; use crate::sync::Arc; -use crate::{RegisterResult, State, Task, TaskRef}; +use crate::{QueueStrategy, RegisterResult, State, Task, TaskRef}; use core::fmt; use core::marker::PhantomData; @@ -229,7 +229,12 @@ pub(crate) struct List { } impl List { - pub(super) fn new() -> List { + pub(super) fn new(strategy: QueueStrategy) -> List { + debug_assert!( + strategy == QueueStrategy::Fifo, + "Slab list only supports FIFO strategy" + ); + List { inner: Mutex::new(ListenerSlab::new()), queue: concurrent_queue::ConcurrentQueue::unbounded(), @@ -1362,7 +1367,7 @@ mod tests { #[test] fn uncontended_inner() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); // Register two listeners. let (mut listener1, mut listener2, mut listener3) = (None, None, None);