Skip to content

Commit

Permalink
Bump to event-listener v3.0.0
Browse files Browse the repository at this point in the history
This commit makes async-channel use the new release of event-listener.
Highlights include a marked increase in efficiency and no_std support.

Supersedes #54

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Sep 16, 2023
1 parent 4cae9cb commit 51ab127
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 97 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ exclude = ["/.*"]
[dependencies]
concurrent-queue = "2"
event-listener = "2.4.0"
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" }
futures-core = "0.3.5"

[dev-dependencies]
easy-parallel = "3"
futures-lite = "1"

[patch.crates-io]
event-listener = { git = "https://github.com/smol-rs/event-listener" }
155 changes: 58 additions & 97 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use std::usize;

use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
use futures_core::stream::Stream;

struct Channel<T> {
Expand Down Expand Up @@ -240,11 +241,11 @@ impl<T> Sender<T> {
/// # });
/// ```
pub fn send(&self, msg: T) -> Send<'_, T> {
Send {
Send::_new(SendInner {
sender: self,
listener: None,
msg: Some(msg),
}
})
}

/// Sends a message into this channel using the blocking strategy.
Expand Down Expand Up @@ -485,7 +486,11 @@ pub struct Receiver<T> {
channel: Arc<Channel<T>>,

/// Listens for a send or close event to unblock this stream.
listener: Option<EventListener>,
///
/// TODO: This is pinned and boxed because `Receiver<T>` is `Unpin` and the newest version
/// of `event_listener::EventListener` is not. At the next major release, we can remove the
/// `Pin<Box<>>` and make `Receiver<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
}

impl<T> Receiver<T> {
Expand Down Expand Up @@ -546,10 +551,10 @@ impl<T> Receiver<T> {
/// # });
/// ```
pub fn recv(&self) -> Recv<'_, T> {
Recv {
Recv::_new(RecvInner {
receiver: self,
listener: None,
}
})
}

/// Receives a message from the channel using the blocking strategy.
Expand Down Expand Up @@ -1059,20 +1064,34 @@ impl fmt::Display for TryRecvError {
}
}

/// A future returned by [`Sender::send()`].
easy_wrapper! {
/// A future returned by [`Sender::send()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
pub(crate) wait();
}

#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, T> {
struct SendInner<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
/// TODO: This is pinned and boxed because `Send<T>` is `Unpin` and the newest version of
/// `event_listener::EventListener` is not. At the next breaking release of this crate, we can
/// remove the `Pin<Box<>>` and make `Send<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
msg: Option<T>,
}

impl<'a, T> Send<'a, T> {
impl<'a, T> Unpin for SendInner<'a, T> {}

impl<'a, T> EventListenerFuture for SendInner<'a, T> {
type Output = Result<(), SendError<T>>;

/// Run this future with the given `Strategy`.
fn run_with_strategy<S: Strategy>(
&mut self,
cx: &mut S::Context,
fn poll_with_strategy<'x, S: Strategy<'x>>(
mut self: Pin<&'x mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Result<(), SendError<T>>> {
loop {
let msg = self.msg.take().unwrap();
Expand All @@ -1084,55 +1103,50 @@ impl<'a, T> Send<'a, T> {
}

// Sending failed - now start listening for notifications or wait for one.
match self.listener.take() {
match self.listener.as_mut() {
None => {
// Start listening and then try sending again.
self.listener = Some(self.sender.channel.send_ops.listen());
}
Some(l) => {
// Poll using the given strategy
if let Err(l) = S::poll(l, cx) {
self.listener = Some(l);
if let Poll::Pending = S::poll(strategy, l.as_mut(), context) {
return Poll::Pending;
} else {
self.listener = None;
}
}
}
}
}

/// Run using the blocking strategy.
fn wait(mut self) -> Result<(), SendError<T>> {
match self.run_with_strategy::<Blocking>(&mut ()) {
Poll::Ready(res) => res,
Poll::Pending => unreachable!(),
}
}
}

impl<'a, T> Unpin for Send<'a, T> {}

impl<'a, T> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.run_with_strategy::<NonBlocking<'_>>(cx)
}
easy_wrapper! {
/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
pub(crate) wait();
}

/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Recv<'a, T> {
struct RecvInner<'a, T> {
receiver: &'a Receiver<T>,
listener: Option<EventListener>,
/// TODO: This is pinned and boxed because `Recv<T>` is `Unpin` and the newest version of
/// `event_listener::EventListener` is not. At the next breaking release of this crate, we can
/// remove the `Pin<Box<>>` and make `Recv<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
}

impl<'a, T> Unpin for Recv<'a, T> {}
impl<'a, T> Unpin for RecvInner<'a, T> {}

impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
type Output = Result<T, RecvError>;

impl<'a, T> Recv<'a, T> {
/// Run this future with the given `Strategy`.
fn run_with_strategy<S: Strategy>(
&mut self,
fn poll_with_strategy<'x, S: Strategy<'x>>(
mut self: Pin<&'x mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
loop {
Expand All @@ -1144,73 +1158,20 @@ impl<'a, T> Recv<'a, T> {
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.take() {
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = Some(self.receiver.channel.recv_ops.listen());
}
Some(l) => {
// Poll using the given strategy.
if let Err(l) = S::poll(l, cx) {
self.listener = Some(l);
if let Poll::Pending = S::poll(strategy, l.as_mut(), cx) {
return Poll::Pending;
} else {
self.listener = None;
}
}
}
}
}

/// Run with the blocking strategy.
fn wait(mut self) -> Result<T, RecvError> {
match self.run_with_strategy::<Blocking>(&mut ()) {
Poll::Ready(res) => res,
Poll::Pending => unreachable!(),
}
}
}

impl<'a, T> Future for Recv<'a, T> {
type Output = Result<T, RecvError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.run_with_strategy::<NonBlocking<'_>>(cx)
}
}

/// A strategy used to poll an `EventListener`.
trait Strategy {
/// Context needed to be provided to the `poll` method.
type Context;

/// Polls the given `EventListener`.
///
/// Returns the `EventListener` back if it was not completed; otherwise,
/// returns `Ok(())`.
fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>;
}

/// Non-blocking strategy for use in asynchronous code.
struct NonBlocking<'a>(&'a mut ());

impl<'a> Strategy for NonBlocking<'a> {
type Context = Context<'a>;

fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> {
match Pin::new(&mut evl).poll(cx) {
Poll::Ready(()) => Ok(()),
Poll::Pending => Err(evl),
}
}
}

/// Blocking strategy for use in synchronous code.
struct Blocking;

impl Strategy for Blocking {
type Context = ();

fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> {
evl.wait();
Ok(())
}
}

0 comments on commit 51ab127

Please sign in to comment.