Skip to content

Commit

Permalink
Port to event-listener v5.0.0
Browse files Browse the repository at this point in the history
cc smol-rs/event-listener#105

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull authored and Jules-Bertholet committed Jan 11, 2024
1 parent 47dd439 commit 7e9b1dd
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 65 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ waker-fn = "1.1.0"

[target.'cfg(target_family = "wasm")'.dev-dependencies]
wasm-bindgen-test = "0.3"

[patch.crates-io]
event-listener = { git = "https://github.com/smol-rs/event-listener.git", branch = "notgull/break" }
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy.git", branch = "notgull/evl5" }
11 changes: 5 additions & 6 deletions src/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Barrier {
BarrierWait::_new(BarrierWaitInner {
barrier: self,
lock: Some(self.state.lock()),
evl: EventListener::new(),
evl: None,
state: WaitState::Initial,
})
}
Expand Down Expand Up @@ -148,8 +148,7 @@ pin_project_lite::pin_project! {
lock: Option<Lock<'a, State>>,

// An event listener for the `barrier.event` event.
#[pin]
evl: EventListener,
evl: Option<EventListener>,

// The current state of the future.
state: WaitState,
Expand Down Expand Up @@ -200,7 +199,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> {

if state.count < this.barrier.n {
// We need to wait for the event.
this.evl.as_mut().listen(&this.barrier.event);
*this.evl = Some(this.barrier.event.listen());
*this.state = WaitState::Waiting { local_gen };
} else {
// We are the last one.
Expand All @@ -212,7 +211,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> {
}

WaitState::Waiting { local_gen } => {
ready!(strategy.poll(this.evl.as_mut(), cx));
ready!(strategy.poll(this.evl, cx));

// We are now re-acquiring the mutex.
this.lock.as_mut().set(Some(this.barrier.state.lock()));
Expand All @@ -233,7 +232,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> {

if *local_gen == state.generation_id && state.count < this.barrier.n {
// We need to wait for the event again.
this.evl.as_mut().listen(&this.barrier.event);
*this.evl = Some(this.barrier.event.listen());
*this.state = WaitState::Waiting {
local_gen: *local_gen,
};
Expand Down
29 changes: 15 additions & 14 deletions src/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::borrow::Borrow;
use core::cell::UnsafeCell;
use core::fmt;
use core::marker::PhantomData;
use core::marker::{PhantomData, PhantomPinned};
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -445,8 +445,7 @@ pin_project_lite::pin_project! {
mutex: Option<B>,

// The event listener waiting on the mutex.
#[pin]
listener: EventListener,
listener: Option<EventListener>,

// The point at which the mutex lock was started.
start: Start,
Expand All @@ -457,6 +456,10 @@ pin_project_lite::pin_project! {
// Capture the `T` lifetime.
#[pin]
_marker: PhantomData<T>,

// Keeping this type `!Unpin` enables future optimizations.
#[pin]
_pin: PhantomPinned
}

impl<T: ?Sized, B: Borrow<Mutex<T>>> PinnedDrop for AcquireSlow<B, T> {
Expand All @@ -477,18 +480,16 @@ impl<T: ?Sized, B: Borrow<Mutex<T>>> AcquireSlow<B, T> {
/// Create a new `AcquireSlow` future.
#[cold]
fn new(mutex: B) -> Self {
// Create a new instance of the listener.
let listener = { EventListener::new() };

AcquireSlow {
mutex: Some(mutex),
listener,
listener: None,
start: Start {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
start: None,
},
starved: false,
_marker: PhantomData,
_pin: PhantomPinned,
}
}

Expand Down Expand Up @@ -517,7 +518,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
let this = self.as_mut().project();
#[cfg(all(feature = "std", not(target_family = "wasm")))]
let start = *this.start.start.get_or_insert_with(Instant::now);
let mutex = Borrow::<Mutex<T>>::borrow(
Expand All @@ -528,8 +529,8 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow
if !*this.starved {
loop {
// Start listening for events.
if !this.listener.is_listening() {
this.listener.as_mut().listen(&mutex.lock_ops);
if this.listener.is_none() {
*this.listener = Some(mutex.lock_ops.listen());

// Try locking if nobody is being starved.
match mutex
Expand All @@ -547,7 +548,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow
_ => break,
}
} else {
ready!(strategy.poll(this.listener.as_mut(), context));
ready!(strategy.poll(this.listener, context));

// Try locking if nobody is being starved.
match mutex
Expand Down Expand Up @@ -591,9 +592,9 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow

// Fairer locking loop.
loop {
if !this.listener.is_listening() {
if this.listener.is_none() {
// Start listening for events.
this.listener.as_mut().listen(&mutex.lock_ops);
*this.listener = Some(mutex.lock_ops.listen());

// Try locking if nobody else is being starved.
match mutex
Expand All @@ -615,7 +616,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow
}
} else {
// Wait for a notification.
ready!(strategy.poll(this.listener.as_mut(), context));
ready!(strategy.poll(this.listener, context));

// Try acquiring the lock without waiting for others.
if mutex.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
Expand Down
19 changes: 7 additions & 12 deletions src/once_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use core::sync::atomic::{AtomicUsize, Ordering};
#[cfg(all(feature = "std", not(target_family = "wasm")))]
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

use event_listener::{Event, EventListener};
use event_listener::{Event, Listener};
use event_listener_strategy::{NonBlocking, Strategy};

/// The current state of the `OnceCell`.
Expand Down Expand Up @@ -274,9 +274,7 @@ impl<T> OnceCell<T> {
}

// Slow path: wait for the value to be initialized.
let listener = EventListener::new();
pin!(listener);
listener.as_mut().listen(&self.passive_waiters);
event_listener::listener!(self.passive_waiters => listener);

// Try again.
if let Some(value) = self.get() {
Expand Down Expand Up @@ -329,9 +327,7 @@ impl<T> OnceCell<T> {
}

// Slow path: wait for the value to be initialized.
let listener = EventListener::new();
pin!(listener);
listener.as_mut().listen(&self.passive_waiters);
event_listener::listener!(self.passive_waiters => listener);

// Try again.
if let Some(value) = self.get() {
Expand Down Expand Up @@ -591,8 +587,7 @@ impl<T> OnceCell<T> {
strategy: &mut impl for<'a> Strategy<'a>,
) -> Result<(), E> {
// The event listener we're currently waiting on.
let event_listener = EventListener::new();
pin!(event_listener);
let mut event_listener = None;

let mut closure = Some(closure);

Expand All @@ -611,10 +606,10 @@ impl<T> OnceCell<T> {
// but we do not have the ability to initialize it.
//
// We need to wait the initialization to complete.
if event_listener.is_listening() {
strategy.wait(event_listener.as_mut()).await;
if let Some(listener) = event_listener.take() {
strategy.wait(listener).await;
} else {
event_listener.as_mut().listen(&self.active_initializers);
event_listener = Some(self.active_initializers.listen());
}
}
State::Uninitialized => {
Expand Down
50 changes: 29 additions & 21 deletions src/rwlock/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! the locking code only once, and also lets us make
//! [`RwLockReadGuard`](super::RwLockReadGuard) covariant in `T`.

use core::marker::PhantomPinned;
use core::mem::forget;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -86,7 +87,8 @@ impl RawRwLock {
RawRead {
lock: self,
state: self.state.load(Ordering::Acquire),
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
}
}

Expand Down Expand Up @@ -161,7 +163,7 @@ impl RawRwLock {
pub(super) fn write(&self) -> RawWrite<'_> {
RawWrite {
lock: self,
no_readers: EventListener::new(),
no_readers: None,
state: WriteState::Acquiring {
lock: self.mutex.lock(),
},
Expand Down Expand Up @@ -193,7 +195,8 @@ impl RawRwLock {

RawUpgrade {
lock: Some(self),
listener: EventListener::new(),
listener: None,
_pin: PhantomPinned,
}
}

Expand Down Expand Up @@ -292,8 +295,11 @@ pin_project_lite::pin_project! {
state: usize,

// The listener for the "no writers" event.
listener: Option<EventListener>,

// Making this type `!Unpin` enables future optimizations.
#[pin]
listener: EventListener,
_pin: PhantomPinned
}
}

Expand All @@ -305,7 +311,7 @@ impl<'a> EventListenerFuture for RawRead<'a> {
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<()> {
let mut this = self.project();
let this = self.project();

loop {
if *this.state & WRITER_BIT == 0 {
Expand All @@ -327,14 +333,14 @@ impl<'a> EventListenerFuture for RawRead<'a> {
}
} else {
// Start listening for "no writer" events.
let load_ordering = if !this.listener.is_listening() {
this.listener.as_mut().listen(&this.lock.no_writer);
let load_ordering = if this.listener.is_none() {
*this.listener = Some(this.lock.no_writer.listen());

// Make sure there really is no writer.
Ordering::SeqCst
} else {
// Wait for the writer to finish.
ready!(strategy.poll(this.listener.as_mut(), cx));
ready!(strategy.poll(this.listener, cx));

// Notify the next reader waiting in list.
this.lock.no_writer.notify(1);
Expand Down Expand Up @@ -409,8 +415,7 @@ pin_project_lite::pin_project! {
pub(super) lock: &'a RawRwLock,

// Our listener for the "no readers" event.
#[pin]
no_readers: EventListener,
no_readers: Option<EventListener>,

// Current state fof this future.
#[pin]
Expand Down Expand Up @@ -473,12 +478,12 @@ impl<'a> EventListenerFuture for RawWrite<'a> {
}

// Start waiting for the readers to finish.
this.no_readers.as_mut().listen(&this.lock.no_readers);
*this.no_readers = Some(this.lock.no_readers.listen());
this.state.as_mut().set(WriteState::WaitingReaders);
}

WriteStateProj::WaitingReaders => {
let load_ordering = if this.no_readers.is_listening() {
let load_ordering = if this.no_readers.is_some() {
Ordering::Acquire
} else {
Ordering::SeqCst
Expand All @@ -492,12 +497,12 @@ impl<'a> EventListenerFuture for RawWrite<'a> {
}

// Wait for the readers to finish.
if !this.no_readers.is_listening() {
if this.no_readers.is_none() {
// Register a listener.
this.no_readers.as_mut().listen(&this.lock.no_readers);
*this.no_readers = Some(this.lock.no_readers.listen());
} else {
// Wait for the readers to finish.
ready!(strategy.poll(this.no_readers.as_mut(), cx));
ready!(strategy.poll(this.no_readers, cx));
};
}
WriteStateProj::Acquired => panic!("Write lock already acquired"),
Expand All @@ -513,8 +518,11 @@ pin_project_lite::pin_project! {
lock: Option<&'a RawRwLock>,

// The event listener we are waiting on.
listener: Option<EventListener>,

// Keeping this future `!Unpin` enables future optimizations.
#[pin]
listener: EventListener,
_pin: PhantomPinned
}

impl PinnedDrop for RawUpgrade<'_> {
Expand All @@ -539,12 +547,12 @@ impl<'a> EventListenerFuture for RawUpgrade<'a> {
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<&'a RawRwLock> {
let mut this = self.project();
let this = self.project();
let lock = this.lock.expect("cannot poll future after completion");

// If there are readers, we need to wait for them to finish.
loop {
let load_ordering = if this.listener.is_listening() {
let load_ordering = if this.listener.is_some() {
Ordering::Acquire
} else {
Ordering::SeqCst
Expand All @@ -557,12 +565,12 @@ impl<'a> EventListenerFuture for RawUpgrade<'a> {
}

// If there are readers, wait for them to finish.
if !this.listener.is_listening() {
if this.listener.is_none() {
// Start listening for "no readers" events.
this.listener.as_mut().listen(&lock.no_readers);
*this.listener = Some(lock.no_readers.listen());
} else {
// Wait for the readers to finish.
ready!(strategy.poll(this.listener.as_mut(), cx));
ready!(strategy.poll(this.listener, cx));
};
}

Expand Down
Loading

0 comments on commit 7e9b1dd

Please sign in to comment.