From 51ab1273b46e29f88135b10629787d001b32d473 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sat, 20 May 2023 13:35:01 -0700 Subject: [PATCH 1/4] Bump to event-listener v3.0.0 This commit makes async-channel use the new release of event-listener. Highlights include a marked increase in efficiency and no_std support. Supersedes #54 Signed-off-by: John Nunley --- Cargo.toml | 4 ++ src/lib.rs | 155 ++++++++++++++++++++--------------------------------- 2 files changed, 62 insertions(+), 97 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5c4eea8..22ed1de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,12 @@ exclude = ["/.*"] [dependencies] concurrent-queue = "2" event-listener = "2.4.0" +event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" } futures-core = "0.3.5" [dev-dependencies] easy-parallel = "3" futures-lite = "1" + +[patch.crates-io] +event-listener = { git = "https://github.com/smol-rs/event-listener" } diff --git a/src/lib.rs b/src/lib.rs index a92f38b..440e59e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,7 @@ use std::usize; use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use event_listener::{Event, EventListener}; +use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy}; use futures_core::stream::Stream; struct Channel { @@ -240,11 +241,11 @@ impl Sender { /// # }); /// ``` pub fn send(&self, msg: T) -> Send<'_, T> { - Send { + Send::_new(SendInner { sender: self, listener: None, msg: Some(msg), - } + }) } /// Sends a message into this channel using the blocking strategy. @@ -485,7 +486,11 @@ pub struct Receiver { channel: Arc>, /// Listens for a send or close event to unblock this stream. - listener: Option, + /// + /// TODO: This is pinned and boxed because `Receiver` is `Unpin` and the newest version + /// of `event_listener::EventListener` is not. At the next major release, we can remove the + /// `Pin>` and make `Receiver` `!Unpin`. + listener: Option>>, } impl Receiver { @@ -546,10 +551,10 @@ impl Receiver { /// # }); /// ``` pub fn recv(&self) -> Recv<'_, T> { - Recv { + Recv::_new(RecvInner { receiver: self, listener: None, - } + }) } /// Receives a message from the channel using the blocking strategy. @@ -1059,20 +1064,34 @@ impl fmt::Display for TryRecvError { } } -/// A future returned by [`Sender::send()`]. +easy_wrapper! { + /// A future returned by [`Sender::send()`]. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError>); + pub(crate) wait(); +} + #[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Send<'a, T> { +struct SendInner<'a, T> { sender: &'a Sender, - listener: Option, + /// TODO: This is pinned and boxed because `Send` is `Unpin` and the newest version of + /// `event_listener::EventListener` is not. At the next breaking release of this crate, we can + /// remove the `Pin>` and make `Send` `!Unpin`. + listener: Option>>, msg: Option, } -impl<'a, T> Send<'a, T> { +impl<'a, T> Unpin for SendInner<'a, T> {} + +impl<'a, T> EventListenerFuture for SendInner<'a, T> { + type Output = Result<(), SendError>; + /// Run this future with the given `Strategy`. - fn run_with_strategy( - &mut self, - cx: &mut S::Context, + fn poll_with_strategy<'x, S: Strategy<'x>>( + mut self: Pin<&'x mut Self>, + strategy: &mut S, + context: &mut S::Context, ) -> Poll>> { loop { let msg = self.msg.take().unwrap(); @@ -1084,55 +1103,50 @@ impl<'a, T> Send<'a, T> { } // Sending failed - now start listening for notifications or wait for one. - match self.listener.take() { + match self.listener.as_mut() { None => { // Start listening and then try sending again. self.listener = Some(self.sender.channel.send_ops.listen()); } Some(l) => { // Poll using the given strategy - if let Err(l) = S::poll(l, cx) { - self.listener = Some(l); + if let Poll::Pending = S::poll(strategy, l.as_mut(), context) { return Poll::Pending; + } else { + self.listener = None; } } } } } - - /// Run using the blocking strategy. - fn wait(mut self) -> Result<(), SendError> { - match self.run_with_strategy::(&mut ()) { - Poll::Ready(res) => res, - Poll::Pending => unreachable!(), - } - } } -impl<'a, T> Unpin for Send<'a, T> {} - -impl<'a, T> Future for Send<'a, T> { - type Output = Result<(), SendError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.run_with_strategy::>(cx) - } +easy_wrapper! { + /// A future returned by [`Receiver::recv()`]. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Recv<'a, T>(RecvInner<'a, T> => Result); + pub(crate) wait(); } -/// A future returned by [`Receiver::recv()`]. #[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Recv<'a, T> { +struct RecvInner<'a, T> { receiver: &'a Receiver, - listener: Option, + /// TODO: This is pinned and boxed because `Recv` is `Unpin` and the newest version of + /// `event_listener::EventListener` is not. At the next breaking release of this crate, we can + /// remove the `Pin>` and make `Recv` `!Unpin`. + listener: Option>>, } -impl<'a, T> Unpin for Recv<'a, T> {} +impl<'a, T> Unpin for RecvInner<'a, T> {} + +impl<'a, T> EventListenerFuture for RecvInner<'a, T> { + type Output = Result; -impl<'a, T> Recv<'a, T> { /// Run this future with the given `Strategy`. - fn run_with_strategy( - &mut self, + fn poll_with_strategy<'x, S: Strategy<'x>>( + mut self: Pin<&'x mut Self>, + strategy: &mut S, cx: &mut S::Context, ) -> Poll> { loop { @@ -1144,73 +1158,20 @@ impl<'a, T> Recv<'a, T> { } // Receiving failed - now start listening for notifications or wait for one. - match self.listener.take() { + match self.listener.as_mut() { None => { // Start listening and then try receiving again. self.listener = Some(self.receiver.channel.recv_ops.listen()); } Some(l) => { // Poll using the given strategy. - if let Err(l) = S::poll(l, cx) { - self.listener = Some(l); + if let Poll::Pending = S::poll(strategy, l.as_mut(), cx) { return Poll::Pending; + } else { + self.listener = None; } } } } } - - /// Run with the blocking strategy. - fn wait(mut self) -> Result { - match self.run_with_strategy::(&mut ()) { - Poll::Ready(res) => res, - Poll::Pending => unreachable!(), - } - } -} - -impl<'a, T> Future for Recv<'a, T> { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.run_with_strategy::>(cx) - } -} - -/// A strategy used to poll an `EventListener`. -trait Strategy { - /// Context needed to be provided to the `poll` method. - type Context; - - /// Polls the given `EventListener`. - /// - /// Returns the `EventListener` back if it was not completed; otherwise, - /// returns `Ok(())`. - fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>; -} - -/// Non-blocking strategy for use in asynchronous code. -struct NonBlocking<'a>(&'a mut ()); - -impl<'a> Strategy for NonBlocking<'a> { - type Context = Context<'a>; - - fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> { - match Pin::new(&mut evl).poll(cx) { - Poll::Ready(()) => Ok(()), - Poll::Pending => Err(evl), - } - } -} - -/// Blocking strategy for use in synchronous code. -struct Blocking; - -impl Strategy for Blocking { - type Context = (); - - fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> { - evl.wait(); - Ok(()) - } } From 6d99af0ef40965d3a54e84a77124144326b0552c Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 6 Aug 2023 11:55:32 -0700 Subject: [PATCH 2/4] Fix test errors and make futures !Unpin This is a breaking change. However, it comes with the ability to avoid heap allocations in many cases, which is a significant boon for users for async-channel. Signed-off-by: John Nunley --- Cargo.toml | 1 + src/lib.rs | 185 +++++++++++++++++++++------------------------ tests/bounded.rs | 11 ++- tests/unbounded.rs | 3 +- 4 files changed, 98 insertions(+), 102 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 22ed1de..57fff5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ concurrent-queue = "2" event-listener = "2.4.0" event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" } futures-core = "0.3.5" +pin-project-lite = "0.2.11" [dev-dependencies] easy-parallel = "3" diff --git a/src/lib.rs b/src/lib.rs index 440e59e..d3a6083 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ use std::usize; use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use event_listener::{Event, EventListener}; use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy}; +use futures_core::ready; use futures_core::stream::Stream; struct Channel { @@ -129,8 +130,8 @@ pub fn bounded(cap: usize) -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { + listener: EventListener::new(&channel.stream_ops), channel, - listener: None, }; (s, r) } @@ -169,8 +170,8 @@ pub fn unbounded() -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { + listener: EventListener::new(&channel.stream_ops), channel, - listener: None, }; (s, r) } @@ -243,8 +244,8 @@ impl Sender { pub fn send(&self, msg: T) -> Send<'_, T> { Send::_new(SendInner { sender: self, - listener: None, msg: Some(msg), + listener: EventListener::new(&self.channel.send_ops), }) } @@ -473,24 +474,34 @@ impl Clone for Sender { } } -/// The receiving side of a channel. -/// -/// Receivers can be cloned and shared among threads. When all receivers associated with a channel -/// are dropped, the channel becomes closed. -/// -/// The channel can also be closed manually by calling [`Receiver::close()`]. -/// -/// Receivers implement the [`Stream`] trait. -pub struct Receiver { - /// Inner channel state. - channel: Arc>, - - /// Listens for a send or close event to unblock this stream. +pin_project_lite::pin_project! { + /// The receiving side of a channel. + /// + /// Receivers can be cloned and shared among threads. When all receivers associated with a channel + /// are dropped, the channel becomes closed. + /// + /// The channel can also be closed manually by calling [`Receiver::close()`]. /// - /// TODO: This is pinned and boxed because `Receiver` is `Unpin` and the newest version - /// of `event_listener::EventListener` is not. At the next major release, we can remove the - /// `Pin>` and make `Receiver` `!Unpin`. - listener: Option>>, + /// Receivers implement the [`Stream`] trait. + pub struct Receiver { + // Inner channel state. + channel: Arc>, + + // Listens for a send or close event to unblock this stream. + #[pin] + listener: EventListener, + } + + impl PinnedDrop for Receiver { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + + // Decrement the receiver count and close the channel if it drops down to zero. + if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { + this.channel.close(); + } + } + } } impl Receiver { @@ -553,7 +564,7 @@ impl Receiver { pub fn recv(&self) -> Recv<'_, T> { Recv::_new(RecvInner { receiver: self, - listener: None, + listener: EventListener::new(&self.channel.recv_ops), }) } @@ -755,15 +766,6 @@ impl Receiver { } } -impl Drop for Receiver { - fn drop(&mut self) { - // Decrement the receiver count and close the channel if it drops down to zero. - if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { - self.channel.close(); - } - } -} - impl fmt::Debug for Receiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Receiver {{ .. }}") @@ -781,7 +783,7 @@ impl Clone for Receiver { Receiver { channel: self.channel.clone(), - listener: None, + listener: EventListener::new(&self.channel.stream_ops), } } } @@ -792,9 +794,11 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // If this stream is listening for events, first wait for a notification. - if let Some(listener) = self.listener.as_mut() { - futures_core::ready!(Pin::new(listener).poll(cx)); - self.listener = None; + { + let this = self.as_mut().project(); + if this.listener.is_listening() { + ready!(this.listener.poll(cx)); + } } loop { @@ -802,27 +806,30 @@ impl Stream for Receiver { match self.try_recv() { Ok(msg) => { // The stream is not blocked on an event - drop the listener. - self.listener = None; + let mut this = self.project(); + this.listener + .as_mut() + .set(EventListener::new(&this.channel.stream_ops)); return Poll::Ready(Some(msg)); } Err(TryRecvError::Closed) => { // The stream is not blocked on an event - drop the listener. - self.listener = None; + let mut this = self.project(); + this.listener + .as_mut() + .set(EventListener::new(&this.channel.stream_ops)); return Poll::Ready(None); } Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - match self.listener.as_mut() { - None => { - // Create a listener and try sending the message again. - self.listener = Some(self.channel.stream_ops.listen()); - } - Some(_) => { - // Go back to the outer loop to poll the listener. - break; - } + let mut this = self.as_mut().project(); + if this.listener.is_listening() { + // Go back to the outer loop to wait for a notification. + break; + } else { + this.listener.as_mut().listen(); } } } @@ -907,7 +914,7 @@ impl WeakReceiver { } Ok(_) => Some(Receiver { channel: self.channel.clone(), - listener: None, + listener: EventListener::new(&self.channel.stream_ops), }), } } @@ -1072,50 +1079,42 @@ easy_wrapper! { pub(crate) wait(); } -#[derive(Debug)] -struct SendInner<'a, T> { - sender: &'a Sender, - /// TODO: This is pinned and boxed because `Send` is `Unpin` and the newest version of - /// `event_listener::EventListener` is not. At the next breaking release of this crate, we can - /// remove the `Pin>` and make `Send` `!Unpin`. - listener: Option>>, - msg: Option, +pin_project_lite::pin_project! { + #[derive(Debug)] + struct SendInner<'a, T> { + sender: &'a Sender, + msg: Option, + #[pin] + listener: EventListener, + } } -impl<'a, T> Unpin for SendInner<'a, T> {} - impl<'a, T> EventListenerFuture for SendInner<'a, T> { type Output = Result<(), SendError>; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - mut self: Pin<&'x mut Self>, + self: Pin<&'x mut Self>, strategy: &mut S, context: &mut S::Context, ) -> Poll>> { + let mut this = self.project(); + loop { - let msg = self.msg.take().unwrap(); + let msg = this.msg.take().unwrap(); // Attempt to send a message. - match self.sender.try_send(msg) { + match this.sender.try_send(msg) { Ok(()) => return Poll::Ready(Ok(())), Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), - Err(TrySendError::Full(m)) => self.msg = Some(m), + Err(TrySendError::Full(m)) => *this.msg = Some(m), } // Sending failed - now start listening for notifications or wait for one. - match self.listener.as_mut() { - None => { - // Start listening and then try sending again. - self.listener = Some(self.sender.channel.send_ops.listen()); - } - Some(l) => { - // Poll using the given strategy - if let Poll::Pending = S::poll(strategy, l.as_mut(), context) { - return Poll::Pending; - } else { - self.listener = None; - } - } + if this.listener.is_listening() { + // Poll using the given strategy + ready!(S::poll(strategy, this.listener.as_mut(), context)); + } else { + this.listener.as_mut().listen(); } } } @@ -1129,48 +1128,40 @@ easy_wrapper! { pub(crate) wait(); } -#[derive(Debug)] -struct RecvInner<'a, T> { - receiver: &'a Receiver, - /// TODO: This is pinned and boxed because `Recv` is `Unpin` and the newest version of - /// `event_listener::EventListener` is not. At the next breaking release of this crate, we can - /// remove the `Pin>` and make `Recv` `!Unpin`. - listener: Option>>, +pin_project_lite::pin_project! { + #[derive(Debug)] + struct RecvInner<'a, T> { + receiver: &'a Receiver, + #[pin] + listener: EventListener, + } } -impl<'a, T> Unpin for RecvInner<'a, T> {} - impl<'a, T> EventListenerFuture for RecvInner<'a, T> { type Output = Result; /// Run this future with the given `Strategy`. fn poll_with_strategy<'x, S: Strategy<'x>>( - mut self: Pin<&'x mut Self>, + self: Pin<&'x mut Self>, strategy: &mut S, cx: &mut S::Context, ) -> Poll> { + let mut this = self.project(); + loop { // Attempt to receive a message. - match self.receiver.try_recv() { + match this.receiver.try_recv() { Ok(msg) => return Poll::Ready(Ok(msg)), Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - match self.listener.as_mut() { - None => { - // Start listening and then try receiving again. - self.listener = Some(self.receiver.channel.recv_ops.listen()); - } - Some(l) => { - // Poll using the given strategy. - if let Poll::Pending = S::poll(strategy, l.as_mut(), cx) { - return Poll::Pending; - } else { - self.listener = None; - } - } + if this.listener.is_listening() { + // Poll using the given strategy + ready!(S::poll(strategy, this.listener.as_mut(), cx)); + } else { + this.listener.as_mut().listen(); } } } diff --git a/tests/bounded.rs b/tests/bounded.rs index 0ae4890..abb8895 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -332,9 +332,10 @@ fn forget_blocked_sender() { .add(move || { assert!(future::block_on(s1.send(3)).is_ok()); assert!(future::block_on(s1.send(7)).is_ok()); - let mut s1_fut = s1.send(13); + let s1_fut = s1.send(13); + futures_lite::pin!(s1_fut); // Poll but keep the future alive. - assert_eq!(future::block_on(future::poll_once(&mut s1_fut)), None); + assert_eq!(future::block_on(future::poll_once(s1_fut)), None); sleep(ms(500)); }) .add(move || { @@ -358,8 +359,9 @@ fn forget_blocked_receiver() { Parallel::new() .add(move || { - let mut r1_fut = r1.recv(); + let r1_fut = r1.recv(); // Poll but keep the future alive. + futures_lite::pin!(r1_fut); assert_eq!(future::block_on(future::poll_once(&mut r1_fut)), None); sleep(ms(500)); }) @@ -436,8 +438,9 @@ fn mpmc_stream() { Parallel::new() .each(0..THREADS, { - let mut r = r; + let r = r; move |_| { + futures_lite::pin!(r); for _ in 0..COUNT { let n = future::block_on(r.next()).unwrap(); v[n].fetch_add(1, Ordering::SeqCst); diff --git a/tests/unbounded.rs b/tests/unbounded.rs index e239d34..31d4987 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -295,8 +295,9 @@ fn mpmc_stream() { Parallel::new() .each(0..THREADS, { - let mut r = r.clone(); + let r = r.clone(); move |_| { + futures_lite::pin!(r); for _ in 0..COUNT { let n = future::block_on(r.next()).unwrap(); v[n].fetch_add(1, Ordering::SeqCst); From a8cd074bdadb88a98dd2ca23ade0ee7489e8fe40 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Wed, 16 Aug 2023 20:58:57 -0700 Subject: [PATCH 3/4] Add an std feature This feature can be disabled to allow the crate to be used without the standard library on embedded environments. Closes #64 Signed-off-by: John Nunley --- .github/workflows/ci.yml | 11 +++++++- Cargo.toml | 16 +++++++---- src/lib.rs | 61 +++++++++++++++++++++++++++++----------- tests/bounded.rs | 2 ++ tests/unbounded.rs | 2 ++ 5 files changed, 68 insertions(+), 24 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d62df86..77fd6e2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,6 +42,14 @@ jobs: if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - run: cargo test + - run: cargo test --no-default-features + - name: Install cargo-hack + uses: taiki-e/install-action@cargo-hack + - run: rustup target add thumbv7m-none-eabi + - name: Run cargo check (without dev-dependencies to catch missing feature flags) + run: cargo hack build --all --no-dev-deps + - run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps + - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic msrv: runs-on: ubuntu-latest @@ -49,12 +57,13 @@ jobs: matrix: # When updating this, the reminder to update the minimum supported # Rust version in Cargo.toml. - rust: ['1.45'] + rust: ['1.59'] steps: - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - run: cargo build + - run: cargo build --no-default-features clippy: runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 57fff5c..72983ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-channel" version = "1.9.0" authors = ["Stjepan Glavina "] edition = "2018" -rust-version = "1.45" +rust-version = "1.59" description = "Async multi-producer multi-consumer channel" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-channel" @@ -15,15 +15,19 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -concurrent-queue = "2" -event-listener = "2.4.0" -event-listener-strategy = { git = "https://github.com/smol-rs/event-listener" } -futures-core = "0.3.5" +concurrent-queue = { version = "2", default-features = false } +event-listener = { version = "2.4.0", default-features = false } +event-listener-strategy = { git = "https://github.com/smol-rs/event-listener", default-features = false } +futures-core = { version = "0.3.5", default-features = false } pin-project-lite = "0.2.11" [dev-dependencies] easy-parallel = "3" futures-lite = "1" +[features] +default = ["std"] +std = ["concurrent-queue/std", "event-listener/std", "event-listener-strategy/std"] + [patch.crates-io] -event-listener = { git = "https://github.com/smol-rs/event-listener" } +event-listener = { git = "https://github.com/smol-rs/event-listener", default-features = false } diff --git a/src/lib.rs b/src/lib.rs index d3a6083..0820a88 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ //! # }); //! ``` +#![cfg_attr(not(feature = "std"), no_std)] #![forbid(unsafe_code)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc( @@ -35,15 +36,16 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] -use std::error; -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::process; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::usize; +extern crate alloc; + +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::{Context, Poll}; +use core::usize; + +use alloc::sync::Arc; use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use event_listener::{Event, EventListener}; @@ -274,6 +276,7 @@ impl Sender { /// drop(r); /// assert_eq!(s.send_blocking(2), Err(SendError(2))); /// ``` + #[cfg(feature = "std")] pub fn send_blocking(&self, msg: T) -> Result<(), SendError> { self.send(msg).wait() } @@ -465,7 +468,7 @@ impl Clone for Sender { // Make sure the count never overflows, even if lots of sender clones are leaked. if count > usize::MAX / 2 { - process::abort(); + abort(); } Sender { @@ -596,6 +599,7 @@ impl Receiver { /// assert_eq!(r.recv_blocking(), Ok(1)); /// assert_eq!(r.recv_blocking(), Err(RecvError)); /// ``` + #[cfg(feature = "std")] pub fn recv_blocking(&self) -> Result { self.recv().wait() } @@ -778,7 +782,7 @@ impl Clone for Receiver { // Make sure the count never overflows, even if lots of receiver clones are leaked. if count > usize::MAX / 2 { - process::abort(); + abort(); } Receiver { @@ -864,7 +868,7 @@ impl WeakSender { Err(_) => None, Ok(new_value) if new_value > usize::MAX / 2 => { // Make sure the count never overflows, even if lots of sender clones are leaked. - process::abort(); + abort(); } Ok(_) => Some(Sender { channel: self.channel.clone(), @@ -910,7 +914,7 @@ impl WeakReceiver { Err(_) => None, Ok(new_value) if new_value > usize::MAX / 2 => { // Make sure the count never overflows, even if lots of receiver clones are leaked. - process::abort(); + abort(); } Ok(_) => Some(Receiver { channel: self.channel.clone(), @@ -948,7 +952,8 @@ impl SendError { } } -impl error::Error for SendError {} +#[cfg(feature = "std")] +impl std::error::Error for SendError {} impl fmt::Debug for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -998,7 +1003,8 @@ impl TrySendError { } } -impl error::Error for TrySendError {} +#[cfg(feature = "std")] +impl std::error::Error for TrySendError {} impl fmt::Debug for TrySendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -1024,7 +1030,8 @@ impl fmt::Display for TrySendError { #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub struct RecvError; -impl error::Error for RecvError {} +#[cfg(feature = "std")] +impl std::error::Error for RecvError {} impl fmt::Display for RecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -1060,7 +1067,8 @@ impl TryRecvError { } } -impl error::Error for TryRecvError {} +#[cfg(feature = "std")] +impl std::error::Error for TryRecvError {} impl fmt::Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -1076,6 +1084,7 @@ easy_wrapper! { #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError>); + #[cfg(feature = "std")] pub(crate) wait(); } @@ -1125,6 +1134,7 @@ easy_wrapper! { #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Recv<'a, T>(RecvInner<'a, T> => Result); + #[cfg(feature = "std")] pub(crate) wait(); } @@ -1166,3 +1176,20 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> { } } } + +#[cfg(feature = "std")] +use std::process::abort; + +#[cfg(not(feature = "std"))] +fn abort() -> ! { + struct PanicOnDrop; + + impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("Panic while panicking to abort"); + } + } + + let _bomb = PanicOnDrop; + panic!("Panic while panicking to abort") +} diff --git a/tests/bounded.rs b/tests/bounded.rs index abb8895..b525dfb 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -25,6 +25,7 @@ fn smoke() { assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } +#[cfg(feature = "std")] #[test] fn smoke_blocking() { let (s, r) = bounded(1); @@ -459,6 +460,7 @@ fn mpmc_stream() { } } +#[cfg(feature = "std")] #[test] fn weak() { let (s, r) = bounded::(3); diff --git a/tests/unbounded.rs b/tests/unbounded.rs index 31d4987..90395a3 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -24,6 +24,7 @@ fn smoke() { assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } +#[cfg(feature = "std")] #[test] fn smoke_blocking() { let (s, r) = unbounded(); @@ -318,6 +319,7 @@ fn mpmc_stream() { } } +#[cfg(feature = "std")] #[test] fn weak() { let (s, r) = unbounded::(); From 17774b34aefd28ff933e9346245b65d1442fb3fb Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 11 Sep 2023 08:41:59 -0700 Subject: [PATCH 4/4] Switch to using published crates Replace the Git patch and use event-listener v3.0.0 and event-listener-strategy v0.1.0 from crates.io. Signed-off-by: John Nunley --- .github/workflows/ci.yml | 1 - Cargo.toml | 7 ++----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 77fd6e2..8b55472 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,6 @@ jobs: - name: Run cargo check (without dev-dependencies to catch missing feature flags) run: cargo hack build --all --no-dev-deps - run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps - - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic msrv: runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 72983ec..ed92ba1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,8 @@ exclude = ["/.*"] [dependencies] concurrent-queue = { version = "2", default-features = false } -event-listener = { version = "2.4.0", default-features = false } -event-listener-strategy = { git = "https://github.com/smol-rs/event-listener", default-features = false } +event-listener = { version = "3.0.0", default-features = false } +event-listener-strategy = { version = "0.2.0", default-features = false } futures-core = { version = "0.3.5", default-features = false } pin-project-lite = "0.2.11" @@ -28,6 +28,3 @@ futures-lite = "1" [features] default = ["std"] std = ["concurrent-queue/std", "event-listener/std", "event-listener-strategy/std"] - -[patch.crates-io] -event-listener = { git = "https://github.com/smol-rs/event-listener", default-features = false }