Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add scheduling options for listener lists. #149

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 58 additions & 37 deletions src/intrusive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::Ordering;
use crate::sync::cell::{Cell, UnsafeCell};
use crate::{RegisterResult, State, TaskRef};
use crate::{QueueStrategy, RegisterResult, State, TaskRef};

#[cfg(feature = "critical-section")]
use core::cell::RefCell;
Expand Down Expand Up @@ -42,17 +42,21 @@ struct Inner<T> {

/// The number of notified listeners.
notified: usize,

/// Strategy by which the list is organized.
strategy: QueueStrategy,
}

impl<T> List<T> {
/// Create a new, empty event listener list.
pub(super) fn new() -> Self {
pub(super) fn new(strategy: QueueStrategy) -> Self {
let inner = Inner {
head: None,
tail: None,
next: None,
len: 0,
notified: 0,
strategy,
};

#[cfg(feature = "critical-section")]
Expand Down Expand Up @@ -149,39 +153,9 @@ impl<T> crate::Inner<T> {
})
}

/// Add a new listener to the list.
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
self.with_inner(|inner| {
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}

// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
inner.next = inner.tail;
}

// Bump the entry count.
inner.len += 1;
});
/// Adds a listener to the list.
pub(crate) fn insert(&self, listener: Pin<&mut Option<Listener<T>>>) {
self.with_inner(|inner| inner.insert(listener))
}

/// Remove a listener from the list.
Expand Down Expand Up @@ -248,6 +222,53 @@ impl<T> crate::Inner<T> {
}

impl<T> Inner<T> {
fn insert(&mut self, mut listener: Pin<&mut Option<Listener<T>>>) {
use QueueStrategy::{Fifo, Lifo};

listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(self.tail.filter(|_| self.strategy == Fifo)),
next: Cell::new(self.head.filter(|_| self.strategy == Lifo)),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the head or tail with the new entry.
let replacing = match self.strategy {
Lifo => &mut self.head,
Fifo => &mut self.tail,
};

match mem::replace(replacing, Some(entry.into())) {
None => *replacing = Some(entry.into()),
Some(t) if self.strategy == Lifo => unsafe {
t.as_ref().prev.set(Some(entry.into()))
},
Some(t) if self.strategy == Fifo => unsafe {
t.as_ref().next.set(Some(entry.into()))
},
Some(_) => unimplemented!("unimplemented queue strategy"),
};
}

// If there are no unnotified entries, or if using LIFO strategy, this is the first one.
if self.strategy == Lifo {
self.next = self.head;
} else if self.next.is_none() {
self.next = self.tail;
}

// Bump the entry count.
self.len += 1;
}

fn remove(
&mut self,
mut listener: Pin<&mut Option<Listener<T>>>,
Expand Down Expand Up @@ -413,7 +434,7 @@ mod tests {

#[test]
fn insert() {
let inner = crate::Inner::new();
let inner = crate::Inner::new(QueueStrategy::Fifo);
make_listeners!(listen1, listen2, listen3);

// Register the listeners.
Expand All @@ -434,7 +455,7 @@ mod tests {

#[test]
fn drop_non_notified() {
let inner = crate::Inner::new();
let inner = crate::Inner::new(QueueStrategy::Fifo);
make_listeners!(listen1, listen2, listen3);

// Register the listeners.
Expand Down
48 changes: 44 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ use sync::WithMut;
use notify::NotificationPrivate;
pub use notify::{IntoNotification, Notification};

/// Queuing strategy for listeners.
#[derive(Clone, Copy, Debug, PartialEq)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer a name more descriptive than Sched. Maybe QueueStrategy?

pub enum QueueStrategy {
/// First-in-first-out listeners are added to the back of the list.
Fifo,

/// Last-in-first-out listeners are added to the front of the list.
Lifo,
}

/// Inner state of [`Event`].
struct Inner<T> {
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
Expand All @@ -143,10 +153,10 @@ struct Inner<T> {
}

impl<T> Inner<T> {
fn new() -> Self {
fn new(queue_strategy: QueueStrategy) -> Self {
Self {
notified: AtomicUsize::new(usize::MAX),
list: sys::List::new(),
list: sys::List::new(queue_strategy),
}
}
}
Expand Down Expand Up @@ -177,6 +187,11 @@ pub struct Event<T = ()> {
/// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
/// reference count.
inner: AtomicPtr<Inner<T>>,

/// Queuing strategy.
///
/// Listeners waiting for notification will be arranged according to the strategy.
queue_strategy: QueueStrategy,
}

unsafe impl<T: Send> Send for Event<T> {}
Expand Down Expand Up @@ -238,13 +253,15 @@ impl<T> Event<T> {
pub const fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
queue_strategy: QueueStrategy::Fifo,
}
}
#[cfg(all(feature = "std", loom))]
#[inline]
pub fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
queue_strategy: QueueStrategy::Fifo,
}
}

Expand Down Expand Up @@ -471,7 +488,7 @@ impl<T> Event<T> {
// If this is the first use, initialize the state.
if inner.is_null() {
// Allocate the state on the heap.
let new = Arc::new(Inner::<T>::new());
let new = Arc::new(Inner::<T>::new(self.queue_strategy));

// Convert the state to a raw pointer.
let new = Arc::into_raw(new) as *mut Inner<T>;
Expand Down Expand Up @@ -556,16 +573,39 @@ impl Event<()> {
#[inline]
#[cfg(not(loom))]
pub const fn new() -> Self {
Self::new_with_queue_strategy(QueueStrategy::Fifo)
}

#[inline]
#[cfg(loom)]
pub fn new() -> Self {
Self::new_with_queue_strategy(QueueStrategy::Fifo)
}

/// Creates a new [`Event`] with specific queue strategy.
///
/// # Examples
///
/// ```
/// use event_listener::{Event, QueueStrategy};
///
/// let event = Event::new_with_queue_strategy(QueueStrategy::Fifo);
/// ```
#[inline]
#[cfg(not(loom))]
pub const fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
queue_strategy,
}
}

#[inline]
#[cfg(loom)]
pub fn new() -> Self {
pub fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
queue_strategy,
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/slab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
use crate::sync::Arc;
use crate::{RegisterResult, State, Task, TaskRef};
use crate::{QueueStrategy, RegisterResult, State, Task, TaskRef};

use core::fmt;
use core::marker::PhantomData;
Expand Down Expand Up @@ -229,7 +229,12 @@ pub(crate) struct List<T> {
}

impl<T> List<T> {
pub(super) fn new() -> List<T> {
pub(super) fn new(strategy: QueueStrategy) -> List<T> {
debug_assert!(
strategy == QueueStrategy::Fifo,
"Slab list only supports FIFO strategy"
);

List {
inner: Mutex::new(ListenerSlab::new()),
queue: concurrent_queue::ConcurrentQueue::unbounded(),
Expand Down Expand Up @@ -1362,7 +1367,7 @@ mod tests {

#[test]
fn uncontended_inner() {
let inner = crate::Inner::new();
let inner = crate::Inner::new(QueueStrategy::Fifo);

// Register two listeners.
let (mut listener1, mut listener2, mut listener3) = (None, None, None);
Expand Down
Loading