diff --git a/Cargo.toml b/Cargo.toml index dc66aec..5ec0178 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,14 +7,6 @@ edition = "2021" [features] # async_std -full-ci = ["async-std-executor", "logging-utils", "channel-async-std"] - -tokio-ci = ["tokio-executor", "logging-utils", "channel-tokio"] -async-std-executor = ["dep:async-std"] -tokio-executor = ["dep:tokio", "dep:tokio-stream", "dep:console-subscriber"] -channel-flume = ["flume"] -channel-tokio = ["dep:tokio", "dep:tokio-stream"] -channel-async-std = ["dep:async-channel"] profiling = [ "opentelemetry-jaeger", "tracing-opentelemetry", @@ -27,38 +19,15 @@ logging-utils = ["tracing-subscriber"] [dependencies] async-lock = "2.7" -async-std = { version = "1.12", features = [ - "attributes", - "unstable", -], optional = true } -async-channel = { version = "1.9.0", optional = true } async-trait = "0.1.71" color-eyre = "0.6.2" -flume = { version = "0.10.14", optional = true } futures = "0.3.28" -tokio = { version = "1", optional = true, features = [ - "fs", - "io-util", - "io-std", - "macros", - "net", - "parking_lot", - "process", - "rt", - "rt-multi-thread", - "signal", - "sync", - "time", - "tracing", -] } -tokio-stream = { version = "0.1.14", optional = true } tracing = "0.1.37" tracing-error = "0.2.0" tracing-subscriber = { version = "0.3.17", features = [ "env-filter", "json", ], optional = true } -console-subscriber = { version = "0.1.10", optional = true } opentelemetry = { version = "0.19.0", features = [ "rt-tokio-current-thread", "metrics", @@ -69,3 +38,52 @@ opentelemetry-jaeger = { version = "0.17.0", features = [ "rt-tokio-current-thread", ], optional = true } opentelemetry-aws = { version = "0.7.0", features = ["trace"], optional = true } + +[target.'cfg(all(async_executor_impl = "tokio"))'.dependencies] +tokio = { version = "1", features = [ + "fs", + "io-util", + "io-std", + "macros", + "net", + "parking_lot", + "process", + "rt", + "rt-multi-thread", + "signal", + "sync", + "time", + "tracing", +] } +tokio-stream = { version = "0.1.14" } + +[target.'cfg(all(async_executor_impl = "async-std"))'.dependencies] +async-std = { version = "1.12", features = [ + "attributes", + "unstable", +]} + +[target.'cfg(all(async_channel_impl = "tokio"))'.dependencies] +console-subscriber = { version = "0.1.10" } +tokio = { version = "1", features = [ + "fs", + "io-util", + "io-std", + "macros", + "net", + "parking_lot", + "process", + "rt", + "rt-multi-thread", + "signal", + "sync", + "time", + "tracing", +] } +tokio-stream = { version = "0.1.14" } + +[target.'cfg(all(async_channel_impl = "async-std"))'.dependencies] +async-channel = { version = "1.9.0" } + +[target.'cfg(all(async_channel_impl = "flume"))'.dependencies] +flume = { version = "0.10.14" } diff --git a/README.md b/README.md index 0eb2fdf..328b973 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,17 @@ This crate exports four things: -- A compatibility/abstraction layer for writing async-executor agnostic code. We support two async executors: async-std and tokio. Each may be toggled with a feature flag. -- A compatibility/abstraction layer for writing async channel agnostic code. We support three async channel implementations: async-std-channels. Each may be toggled with a feature flag. +- A compatibility/abstraction layer for writing async-executor agnostic code. We support two async executors: async-std and tokio. Each may be toggled with a configuration flag. +- A compatibility/abstraction layer for writing async channel agnostic code. We support three async channel implementations: async-std-channels. Each may be toggled with a configuration flag. - A library exporting a bunch of useful async primitives. - A tracing configuration layer optionally supporting console and opentelemetry integration. +# Example usage +```bash +RUSTFLAGS='--cfg async_executor_impl="tokio" --cfg async_channel_impl="tokio"' cargo build +``` + +`async_executor_impl` may be either `tokio` or `async-std`. `async_channel_impl` may be either `tokio`, `async-std`, or `flume`. Note that using `tokio` channels requires `tokio` to be the runtime. Note that the async executor impl and async channel impl must be set in order for this crate to compile successfully. diff --git a/src/async_primitives/subscribable_mutex.rs b/src/async_primitives/subscribable_mutex.rs index 4df060d..0d422ed 100644 --- a/src/async_primitives/subscribable_mutex.rs +++ b/src/async_primitives/subscribable_mutex.rs @@ -5,12 +5,12 @@ use futures::{stream::FuturesOrdered, Future, FutureExt}; use std::{fmt, time::Duration}; use tracing::warn; -#[cfg(feature = "async-std-executor")] +#[cfg(all(async_executor_impl = "async-std"))] use async_std::prelude::StreamExt; -#[cfg(feature = "tokio-executor")] +#[cfg(all(async_executor_impl = "tokio"))] use tokio_stream::StreamExt; -#[cfg(not(any(feature = "async-std-executor", feature = "tokio-executor")))] -std::compile_error! {"Either feature \"async-std-executor\" or feature \"tokio-executor\" must be enabled for this crate."} +#[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))] +std::compile_error! {"The cfg flag async_executor_impl must be set in rustflags to either \"async-std\" or \"tokio\" for this crate. Try adding `--cfg async_executor_impl=\"tokio\""} /// A mutex that can register subscribers to be notified. This works in the same way as [`Mutex`], but has some additional functions: /// @@ -251,10 +251,10 @@ mod tests { use std::{sync::Arc, time::Duration}; #[cfg_attr( - feature = "tokio-executor", + async_executor_impl = "tokio", tokio::test(flavor = "multi_thread", worker_threads = 2) )] - #[cfg_attr(feature = "async-std-executor", async_std::test)] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] async fn test_wait_timeout_until() { let mutex: Arc> = Arc::default(); { @@ -276,10 +276,10 @@ mod tests { } #[cfg_attr( - feature = "tokio-executor", + async_executor_impl = "tokio", tokio::test(flavor = "multi_thread", worker_threads = 2) )] - #[cfg_attr(feature = "async-std-executor", async_std::test)] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] async fn test_wait_timeout_until_fail() { let mutex: Arc> = Arc::default(); { @@ -300,10 +300,10 @@ mod tests { } #[cfg_attr( - feature = "tokio-executor", + async_executor_impl = "tokio", tokio::test(flavor = "multi_thread", worker_threads = 2) )] - #[cfg_attr(feature = "async-std-executor", async_std::test)] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] async fn test_compare_and_set() { let mutex = SubscribableMutex::new(5usize); let subscriber = mutex.subscribe().await; @@ -322,10 +322,10 @@ mod tests { } #[cfg_attr( - feature = "tokio-executor", + async_executor_impl = "tokio", tokio::test(flavor = "multi_thread", worker_threads = 2) )] - #[cfg_attr(feature = "async-std-executor", async_std::test)] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] async fn test_subscriber() { let mutex = SubscribableMutex::new(5usize); let subscriber = mutex.subscribe().await; diff --git a/src/channel.rs b/src/channel.rs index 869c19c..a3a01f2 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -21,8 +21,8 @@ mod oneshot; /// Unbounded channels mod unbounded; -#[cfg(all(feature = "async-std-executor", feature = "channel-tokio"))] -compile_error!("feature 'async-std-executor' and 'channel-tokio' cannot be used at the same time; 'channel-tokio' needs the tokio runtime"); +#[cfg(all(async_executor_impl = "async-std", async_channel_impl = "tokio"))] +compile_error!("async_executor_impl = 'async-std-executor' and async_channel_impl = 'channel-tokio' cannot be used together; 'channel-tokio' needs the tokio runtime"); pub use bounded::{bounded, BoundedStream, Receiver, RecvError, SendError, Sender, TryRecvError}; pub use oneshot::{oneshot, OneShotReceiver, OneShotRecvError, OneShotSender, OneShotTryRecvError}; diff --git a/src/channel/bounded.rs b/src/channel/bounded.rs index 228cee4..00675b3 100644 --- a/src/channel/bounded.rs +++ b/src/channel/bounded.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use futures::Stream; /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-tokio")] +#[cfg(all(async_channel_impl = "tokio"))] mod inner { pub use tokio::sync::mpsc::error::{SendError, TryRecvError}; @@ -45,7 +45,7 @@ mod inner { } /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-flume")] +#[cfg(all(async_channel_impl = "flume"))] mod inner { pub use flume::{RecvError, SendError, TryRecvError}; @@ -75,7 +75,7 @@ mod inner { } /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-async-std")] +#[cfg(all(async_channel_impl = "async-std"))] mod inner { pub use async_channel::{RecvError, SendError, TryRecvError}; @@ -114,9 +114,9 @@ impl Sender { /// /// Will return an error if the receiver is dropped pub async fn send(&self, msg: T) -> Result<(), SendError> { - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] let result = self.0.send_async(msg).await; - #[cfg(not(feature = "channel-flume"))] + #[cfg(not(all(async_channel_impl = "flume")))] let result = self.0.send(msg).await; result @@ -130,11 +130,11 @@ impl Receiver { /// /// Will return an error if the sender is dropped pub async fn recv(&mut self) -> Result { - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] let result = self.0.recv_async().await; - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] let result = self.0.recv().await.ok_or(RecvError); - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] let result = self.0.recv().await; result @@ -144,11 +144,11 @@ impl Receiver { where T: 'static, { - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] let result = self.0; - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] let result = tokio_stream::wrappers::ReceiverStream::new(self.0); - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] let result = self.0.into_stream(); BoundedStream(result) @@ -219,14 +219,14 @@ impl Stream for BoundedStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] return >::poll_next(Pin::new(&mut self.0), cx); - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] return as Stream>::poll_next( Pin::new(&mut self.0), cx, ); - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] return as Stream>::poll_next(Pin::new(&mut self.0), cx); } } diff --git a/src/channel/oneshot.rs b/src/channel/oneshot.rs index 0356214..18e1b0f 100644 --- a/src/channel/oneshot.rs +++ b/src/channel/oneshot.rs @@ -1,5 +1,5 @@ /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-tokio")] +#[cfg(all(async_channel_impl = "tokio"))] mod inner { pub use tokio::sync::oneshot::{error::TryRecvError as OneShotTryRecvError, Receiver, Sender}; @@ -40,7 +40,7 @@ mod inner { } /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-flume")] +#[cfg(all(async_channel_impl = "flume"))] mod inner { use flume::{Receiver, Sender}; pub use flume::{RecvError as OneShotRecvError, TryRecvError as OneShotTryRecvError}; @@ -61,7 +61,7 @@ mod inner { } /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-async-std")] +#[cfg(all(async_channel_impl = "async-std"))] mod inner { use async_std::channel::{Receiver, Sender}; pub use async_std::channel::{ @@ -90,11 +90,11 @@ impl OneShotSender { /// /// If this fails because the receiver is dropped, a warning will be printed. pub fn send(self, msg: T) { - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] if self.0.try_send(msg).is_err() { tracing::warn!("Could not send msg on OneShotSender, did the receiver drop?"); } - #[cfg(not(feature = "channel-async-std"))] + #[cfg(not(all(async_channel_impl = "async-std")))] if self.0.send(msg).is_err() { tracing::warn!("Could not send msg on OneShotSender, did the receiver drop?"); } @@ -108,11 +108,11 @@ impl OneShotReceiver { /// /// Will return an error if the sender channel is dropped pub async fn recv(self) -> Result { - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] let result = self.0.await.map_err(Into::into); - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] let result = self.0.recv_async().await; - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] let result = self.0.recv().await; result diff --git a/src/channel/unbounded.rs b/src/channel/unbounded.rs index 787c4ef..0dd4eaf 100644 --- a/src/channel/unbounded.rs +++ b/src/channel/unbounded.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use futures::Stream; /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-tokio")] +#[cfg(all(async_channel_impl = "tokio"))] mod inner { pub use tokio::sync::mpsc::error::{ SendError as UnboundedSendError, TryRecvError as UnboundedTryRecvError, @@ -56,7 +56,7 @@ mod inner { } /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-flume")] +#[cfg(all(async_channel_impl = "flume"))] mod inner { use flume::{r#async::RecvStream, Receiver, Sender}; pub use flume::{ @@ -92,7 +92,7 @@ mod inner { } /// inner module, used to group feature-specific imports -#[cfg(feature = "channel-async-std")] +#[cfg(all(async_channel_impl = "async-std"))] mod inner { use async_std::channel::{Receiver, Sender}; pub use async_std::channel::{ @@ -137,11 +137,11 @@ impl UnboundedSender { /// This may fail if the receiver is dropped. #[allow(clippy::unused_async)] // under tokio this function is actually sync pub async fn send(&self, msg: T) -> Result<(), UnboundedSendError> { - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] let result = self.0.send_async(msg).await; - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] let result = self.0.send(msg); - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] let result = self.0.send(msg).await; result } @@ -156,21 +156,21 @@ impl UnboundedReceiver { /// /// Will produce an error if all senders are dropped pub async fn recv(&self) -> Result { - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] let result = self.0.recv_async().await; - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] let result = self.0.lock().await.recv().await.ok_or(UnboundedRecvError); - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] let result = self.0.recv().await; result } /// Turn this receiver into a stream. pub fn into_stream(self) -> UnboundedStream { - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] let result = self.0; - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] let result = tokio_stream::wrappers::UnboundedReceiverStream::new(self.0.into_inner()); - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] let result = self.0.into_stream(); UnboundedStream(result) @@ -181,11 +181,11 @@ impl UnboundedReceiver { /// /// Will return an error if no value is currently queued. This function will not block. pub fn try_recv(&self) -> Result { - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] // TODO: Check if this actually doesn't block let result = crate::art::async_block_on(self.0.lock()).try_recv(); - #[cfg(not(feature = "channel-tokio"))] + #[cfg(not(all(async_channel_impl = "tokio")))] let result = self.0.try_recv(); result @@ -245,9 +245,9 @@ impl UnboundedReceiver { #[allow(clippy::len_without_is_empty, clippy::unused_self)] #[must_use] pub fn len(&self) -> Option { - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] let result = None; - #[cfg(not(feature = "channel-tokio"))] + #[cfg(not(all(async_channel_impl = "tokio")))] let result = Some(self.0.len()); result } @@ -260,14 +260,14 @@ impl Stream for UnboundedStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - #[cfg(feature = "channel-flume")] + #[cfg(all(async_channel_impl = "flume"))] return >::poll_next(Pin::new(&mut self.0), cx); - #[cfg(feature = "channel-tokio")] + #[cfg(all(async_channel_impl = "tokio"))] return as Stream>::poll_next( Pin::new(&mut self.0), cx, ); - #[cfg(feature = "channel-async-std")] + #[cfg(all(async_channel_impl = "async-std"))] return as Stream>::poll_next(Pin::new(&mut self.0), cx); } } diff --git a/src/lib.rs b/src/lib.rs index 016242c..896a640 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,19 +3,21 @@ //! with a feature flag toggle //! while exposing the same interface for general consumption between both -#[cfg(all(feature = "async-std-executor", feature = "tokio-executor"))] -std::compile_error!("Both feature \"async-std-executor\" and feature \"tokio-executor\" must not be concurrently enabled for this crate."); +#[cfg(all(async_executor_impl = "async-std", async_executor_impl = "tokio"))] +std::compile_error!( + "Both cfg options \"async-std\" and \"tokio\" must not be concurrently enabled for this crate." +); -#[cfg(not(any(feature = "async-std-executor", feature = "tokio-executor")))] -compile_error! {"Either feature \"async-std-executor\" or feature \"tokio-executor\" must be enabled for this crate."} +#[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))] +compile_error! {"Either config option \"async-std\" or \"tokio\" must be enabled for this crate."} /// abstraction over both `tokio` and `async-std`, making it possible to use either based on a feature flag -#[cfg(feature = "async-std-executor")] +#[cfg(async_executor_impl = "async-std")] #[path = "art/async-std.rs"] pub mod art; /// abstraction over both `tokio` and `async-std`, making it possible to use either based on a feature flag -#[cfg(feature = "tokio-executor")] +#[cfg(async_executor_impl = "tokio")] #[path = "art/tokio.rs"] pub mod art; diff --git a/src/logging.rs b/src/logging.rs index 081f902..00d4137 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -66,13 +66,13 @@ fn gen_opentelemetry_layer() -> opentelemetry::sdk::trace::Tracer { /// I couldn't get the types to play nicely with a generic function macro_rules! complete_init { ( $R:expr ) => { - #[cfg(feature = "tokio-executor")] + #[cfg(async_executor_impl = "tokio")] let console_layer = var("TOKIO_CONSOLE_ENABLED") == Ok("true".to_string()); #[cfg(feature = "profiling")] let tracer_enabled = var("OTL_ENABLED") == Ok("true".to_string()); - #[cfg(all(feature = "tokio-executor", feature = "profiling"))] + #[cfg(all(async_executor_impl = "tokio", feature = "profiling"))] if console_layer && tracer_enabled { let registry = $R.with(console_subscriber::spawn()); let registry = registry @@ -88,7 +88,7 @@ macro_rules! complete_init { return; } - #[cfg(feature = "tokio-executor")] + #[cfg(async_executor_impl = "tokio")] if console_layer { $R.with(console_subscriber::spawn()).init(); return;