Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump to event-listener v3.0.0 #59

Merged
merged 4 commits into from
Sep 16, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix test errors and make futures !Unpin
This is a breaking change. However, it comes with the ability to avoid
heap allocations in many cases, which is a significant boon for users
for async-channel.

Signed-off-by: John Nunley <dev@notgull.net>
notgull committed Sep 16, 2023
commit 6d99af0ef40965d3a54e84a77124144326b0552c
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ concurrent-queue = "2"
event-listener = "2.4.0"
event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" }
futures-core = "0.3.5"
pin-project-lite = "0.2.11"

[dev-dependencies]
easy-parallel = "3"
185 changes: 88 additions & 97 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ use std::usize;
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
use futures_core::ready;
use futures_core::stream::Stream;

struct Channel<T> {
@@ -129,8 +130,8 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(&channel.stream_ops),
channel,
listener: None,
};
(s, r)
}
@@ -169,8 +170,8 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(&channel.stream_ops),
channel,
listener: None,
};
(s, r)
}
@@ -243,8 +244,8 @@ impl<T> Sender<T> {
pub fn send(&self, msg: T) -> Send<'_, T> {
Send::_new(SendInner {
sender: self,
listener: None,
msg: Some(msg),
listener: EventListener::new(&self.channel.send_ops),
})
}

@@ -473,24 +474,34 @@ impl<T> Clone for Sender<T> {
}
}

/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
/// are dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Receiver::close()`].
///
/// Receivers implement the [`Stream`] trait.
pub struct Receiver<T> {
/// Inner channel state.
channel: Arc<Channel<T>>,

/// Listens for a send or close event to unblock this stream.
pin_project_lite::pin_project! {
/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
/// are dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Receiver::close()`].
///
/// TODO: This is pinned and boxed because `Receiver<T>` is `Unpin` and the newest version
/// of `event_listener::EventListener` is not. At the next major release, we can remove the
/// `Pin<Box<>>` and make `Receiver<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
/// Receivers implement the [`Stream`] trait.
pub struct Receiver<T> {
// Inner channel state.
channel: Arc<Channel<T>>,

// Listens for a send or close event to unblock this stream.
#[pin]
listener: EventListener,
}

impl<T> PinnedDrop for Receiver<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();

// Decrement the receiver count and close the channel if it drops down to zero.
if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
this.channel.close();
}
}
}
}

impl<T> Receiver<T> {
@@ -553,7 +564,7 @@ impl<T> Receiver<T> {
pub fn recv(&self) -> Recv<'_, T> {
Recv::_new(RecvInner {
receiver: self,
listener: None,
listener: EventListener::new(&self.channel.recv_ops),
})
}

@@ -755,15 +766,6 @@ impl<T> Receiver<T> {
}
}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// Decrement the receiver count and close the channel if it drops down to zero.
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.close();
}
}
}

impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Receiver {{ .. }}")
@@ -781,7 +783,7 @@ impl<T> Clone for Receiver<T> {

Receiver {
channel: self.channel.clone(),
listener: None,
listener: EventListener::new(&self.channel.stream_ops),
}
}
}
@@ -792,37 +794,42 @@ impl<T> Stream for Receiver<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
futures_core::ready!(Pin::new(listener).poll(cx));
self.listener = None;
{
let this = self.as_mut().project();
if this.listener.is_listening() {
ready!(this.listener.poll(cx));
}
}

loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
let mut this = self.project();
this.listener
.as_mut()
.set(EventListener::new(&this.channel.stream_ops));
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
let mut this = self.project();
this.listener
.as_mut()
.set(EventListener::new(&this.channel.stream_ops));
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Create a listener and try sending the message again.
self.listener = Some(self.channel.stream_ops.listen());
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
let mut this = self.as_mut().project();
if this.listener.is_listening() {
// Go back to the outer loop to wait for a notification.
break;
} else {
this.listener.as_mut().listen();
}
}
}
@@ -907,7 +914,7 @@ impl<T> WeakReceiver<T> {
}
Ok(_) => Some(Receiver {
channel: self.channel.clone(),
listener: None,
listener: EventListener::new(&self.channel.stream_ops),
}),
}
}
@@ -1072,50 +1079,42 @@ easy_wrapper! {
pub(crate) wait();
}

#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
/// TODO: This is pinned and boxed because `Send<T>` is `Unpin` and the newest version of
/// `event_listener::EventListener` is not. At the next breaking release of this crate, we can
/// remove the `Pin<Box<>>` and make `Send<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
msg: Option<T>,
pin_project_lite::pin_project! {
#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
msg: Option<T>,
#[pin]
listener: EventListener,
}
}

impl<'a, T> Unpin for SendInner<'a, T> {}

impl<'a, T> EventListenerFuture for SendInner<'a, T> {
type Output = Result<(), SendError<T>>;

/// Run this future with the given `Strategy`.
fn poll_with_strategy<'x, S: Strategy<'x>>(
mut self: Pin<&'x mut Self>,
self: Pin<&'x mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Result<(), SendError<T>>> {
let mut this = self.project();

loop {
let msg = self.msg.take().unwrap();
let msg = this.msg.take().unwrap();
// Attempt to send a message.
match self.sender.try_send(msg) {
match this.sender.try_send(msg) {
Ok(()) => return Poll::Ready(Ok(())),
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => self.msg = Some(m),
Err(TrySendError::Full(m)) => *this.msg = Some(m),
}

// Sending failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try sending again.
self.listener = Some(self.sender.channel.send_ops.listen());
}
Some(l) => {
// Poll using the given strategy
if let Poll::Pending = S::poll(strategy, l.as_mut(), context) {
return Poll::Pending;
} else {
self.listener = None;
}
}
if this.listener.is_listening() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), context));
} else {
this.listener.as_mut().listen();
}
}
}
@@ -1129,48 +1128,40 @@ easy_wrapper! {
pub(crate) wait();
}

#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a Receiver<T>,
/// TODO: This is pinned and boxed because `Recv<T>` is `Unpin` and the newest version of
/// `event_listener::EventListener` is not. At the next breaking release of this crate, we can
/// remove the `Pin<Box<>>` and make `Recv<T>` `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
pin_project_lite::pin_project! {
#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a Receiver<T>,
#[pin]
listener: EventListener,
}
}

impl<'a, T> Unpin for RecvInner<'a, T> {}

impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
type Output = Result<T, RecvError>;

/// Run this future with the given `Strategy`.
fn poll_with_strategy<'x, S: Strategy<'x>>(
mut self: Pin<&'x mut Self>,
self: Pin<&'x mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
let mut this = self.project();

loop {
// Attempt to receive a message.
match self.receiver.try_recv() {
match this.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}

// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = Some(self.receiver.channel.recv_ops.listen());
}
Some(l) => {
// Poll using the given strategy.
if let Poll::Pending = S::poll(strategy, l.as_mut(), cx) {
return Poll::Pending;
} else {
self.listener = None;
}
}
if this.listener.is_listening() {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), cx));
} else {
this.listener.as_mut().listen();
}
}
}
11 changes: 7 additions & 4 deletions tests/bounded.rs
Original file line number Diff line number Diff line change
@@ -332,9 +332,10 @@ fn forget_blocked_sender() {
.add(move || {
assert!(future::block_on(s1.send(3)).is_ok());
assert!(future::block_on(s1.send(7)).is_ok());
let mut s1_fut = s1.send(13);
let s1_fut = s1.send(13);
futures_lite::pin!(s1_fut);
// Poll but keep the future alive.
assert_eq!(future::block_on(future::poll_once(&mut s1_fut)), None);
assert_eq!(future::block_on(future::poll_once(s1_fut)), None);
sleep(ms(500));
})
.add(move || {
@@ -358,8 +359,9 @@ fn forget_blocked_receiver() {

Parallel::new()
.add(move || {
let mut r1_fut = r1.recv();
let r1_fut = r1.recv();
// Poll but keep the future alive.
futures_lite::pin!(r1_fut);
assert_eq!(future::block_on(future::poll_once(&mut r1_fut)), None);
sleep(ms(500));
})
@@ -436,8 +438,9 @@ fn mpmc_stream() {

Parallel::new()
.each(0..THREADS, {
let mut r = r;
let r = r;
move |_| {
futures_lite::pin!(r);
for _ in 0..COUNT {
let n = future::block_on(r.next()).unwrap();
v[n].fetch_add(1, Ordering::SeqCst);
3 changes: 2 additions & 1 deletion tests/unbounded.rs
Original file line number Diff line number Diff line change
@@ -295,8 +295,9 @@ fn mpmc_stream() {

Parallel::new()
.each(0..THREADS, {
let mut r = r.clone();
let r = r.clone();
move |_| {
futures_lite::pin!(r);
for _ in 0..COUNT {
let n = future::block_on(r.next()).unwrap();
v[n].fetch_add(1, Ordering::SeqCst);