diff --git a/async-queues/Cargo.toml b/async-queues/Cargo.toml index 571d976..590cd5e 100644 --- a/async-queues/Cargo.toml +++ b/async-queues/Cargo.toml @@ -6,6 +6,14 @@ authors = ["Brian Martin "] repository = "https://github.com/iopsystems/workloads" license = "MIT OR Apache-2.0" +[[bin]] +name = "async-queues-broadcast" +path = "src/async-queues-broadcast.rs" + +[[bin]] +name = "async-queues-mpmc" +path = "src/async-queues-mpmc.rs" + [dependencies] async-broadcast = "0.6.0" async-channel = "2.1.1" @@ -25,7 +33,3 @@ splaycast = "0.2.0" tokio = { version = "1.35.1", features = ["full"] } tokio-stream = "0.1.14" widecast = { git = "https://github.com/Phantomical/widecast" } - -[[bin]] -name = "async-queues" -path = "src/main.rs" diff --git a/async-queues/src/async-queues-broadcast.rs b/async-queues/src/async-queues-broadcast.rs new file mode 100644 index 0000000..1382670 --- /dev/null +++ b/async-queues/src/async-queues-broadcast.rs @@ -0,0 +1,25 @@ +use async_queues::broadcast; +use async_queues::broadcast::{Config, Test}; +use clap::Parser; + +fn main() { + let config = Config::parse(); + + match config.test() { + Test::AsyncBroadcast => { + let _ = broadcast::async_broadcast::run(config); + } + Test::Broadcaster => { + let _ = broadcast::broadcaster::run(config); + } + Test::Splaycast => { + let _ = broadcast::splaycast::run(config); + } + Test::Tokio => { + let _ = broadcast::tokio::run(config); + } + Test::Widecast => { + let _ = broadcast::widecast::run(config); + } + }; +} diff --git a/async-queues/src/async-queues-mpmc.rs b/async-queues/src/async-queues-mpmc.rs new file mode 100644 index 0000000..8a81f4f --- /dev/null +++ b/async-queues/src/async-queues-mpmc.rs @@ -0,0 +1,25 @@ +use async_queues::mpmc; +use async_queues::mpmc::{Config, Test}; +use clap::Parser; + +fn main() { + let config = Config::parse(); + + match config.test() { + Test::AsyncChannel => { + let _ = mpmc::async_channel::run(config); + } + Test::Asyncstd => { + let _ = mpmc::asyncstd::run(config); + } + Test::Flume => { + let _ = mpmc::flume::run(config); + } + Test::Kanal => { + let _ = mpmc::kanal::run(config); + } + Test::Postage => { + let _ = mpmc::postage::run(config); + } + }; +} diff --git a/async-queues/src/async_broadcast.rs b/async-queues/src/broadcast/async_broadcast.rs similarity index 99% rename from async-queues/src/async_broadcast.rs rename to async-queues/src/broadcast/async_broadcast.rs index 887fef1..9bf091e 100644 --- a/async-queues/src/async_broadcast.rs +++ b/async-queues/src/broadcast/async_broadcast.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use ::async_broadcast::{Receiver, RecvError, Sender}; diff --git a/async-queues/src/broadcaster_broadcast.rs b/async-queues/src/broadcast/broadcaster.rs similarity index 99% rename from async-queues/src/broadcaster_broadcast.rs rename to async-queues/src/broadcast/broadcaster.rs index 4d81920..917bc5b 100644 --- a/async-queues/src/broadcaster_broadcast.rs +++ b/async-queues/src/broadcast/broadcaster.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use broadcaster::*; diff --git a/async-queues/src/broadcast/mod.rs b/async-queues/src/broadcast/mod.rs new file mode 100644 index 0000000..fbc308b --- /dev/null +++ b/async-queues/src/broadcast/mod.rs @@ -0,0 +1,202 @@ +use crate::*; +use clap::Parser; +use ratelimit::Ratelimiter; +use std::sync::Arc; + +pub mod async_broadcast; +pub mod broadcaster; +pub mod splaycast; +pub mod tokio; +pub mod widecast; + +#[derive(ValueEnum, Copy, Clone, Debug, PartialEq)] +#[clap(rename_all = "snake_case")] +pub enum Test { + AsyncBroadcast, + Broadcaster, + Splaycast, + Tokio, + Widecast, +} + +#[derive(Parser, Debug, Clone)] +#[command(author, version, about, long_about = None)] +pub struct Config { + #[arg(long)] + test: Test, + + #[arg(long, default_value_t = 60)] + duration: u64, + + #[arg(long, default_value_t = 128)] + queue_depth: usize, + + #[arg(long, default_value_t = false)] + split_runtime: bool, + + #[arg(long, default_value_t = 64)] + message_length: usize, + + #[arg(long, default_value_t = 1)] + threads: usize, + + #[arg(long, default_value_t = 61)] + global_queue_interval: u32, + + #[arg(long, default_value_t = 61)] + event_interval: u32, + + #[arg(long, default_value_t = 1)] + publishers: usize, + #[arg(long, default_value_t = 1)] + publisher_threads: usize, + + #[arg(long, default_value_t = 1000)] + publish_rate: u64, + + #[arg(long, default_value_t = 1)] + subscribers: usize, + #[arg(long, default_value_t = 1)] + subscriber_threads: usize, + + #[arg(long, default_value_t = 0)] + fanout: u8, + #[arg(long, default_value_t = 1)] + fanout_threads: usize, + + #[arg(long, default_value = None)] + histogram: Option, +} + +impl Config { + pub fn test(&self) -> Test { + self.test + } + + /// Create a ratelimiter for message sending based on the config + pub fn ratelimiter(&self) -> Arc> { + if self.publish_rate == 0 { + return Arc::new(None); + } + + let quanta = (self.publish_rate / 1_000_000) + 1; + let delay = quanta * Duration::from_secs(1).as_nanos() as u64 / self.publish_rate; + + Arc::new(Some( + Ratelimiter::builder(quanta, Duration::from_nanos(delay)) + .max_tokens(quanta) + .build() + .unwrap(), + )) + } + + /// Return the queue depth to be used for the channel/queue + pub fn queue_depth(&self) -> usize { + self.queue_depth + } + + pub fn subscribers(&self) -> usize { + self.subscribers + } + + pub fn publishers(&self) -> usize { + self.publishers + } + + pub fn fanout(&self) -> u8 { + self.fanout + } + + pub fn message_length(&self) -> usize { + self.message_length + } + + /// Creates a collection of tokio runtimes, either one combined runtime or + /// a dual runtime depending on the configuration + pub fn runtime(&self) -> Runtime { + let combined = self._runtime(self.threads); + + let publisher = if !self.split_runtime { + None + } else { + Some(self._runtime(self.publisher_threads)) + }; + + let subscriber = if !self.split_runtime { + None + } else { + Some(self._runtime(self.subscriber_threads)) + }; + + let fanout = if !self.split_runtime { + None + } else { + Some(self._runtime(self.fanout_threads)) + }; + + Runtime { + config: self.clone(), + combined, + publisher, + subscriber, + fanout, + } + } + + /// Internal function to create a tokio runtime with a given number of + /// threads. This makes sure we use consistent configuration for each + /// runtime + fn _runtime(&self, threads: usize) -> ::tokio::runtime::Runtime { + ::tokio::runtime::Builder::new_multi_thread() + .worker_threads(threads) + .enable_all() + .global_queue_interval(self.global_queue_interval) + .event_interval(self.event_interval) + .build() + .expect("failed to initialize runtime") + } +} + +impl AsyncQueuesConfig for Config { + fn duration(&self) -> std::time::Duration { + core::time::Duration::from_secs(self.duration) + } + + fn histogram(&self) -> std::option::Option<&str> { + self.histogram.as_deref() + } + + fn global_queue_interval(&self) -> u32 { + self.global_queue_interval + } + + fn event_interval(&self) -> u32 { + self.event_interval + } +} + +impl std::fmt::Display for Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.split_runtime { + write!(f, "publishers: {} publisher_threads: {} subscribers: {} subscriber_threads: {} fanout: {} fanout_threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}", + self.publishers, + self.publisher_threads, + self.subscribers, + self.subscriber_threads, + self.fanout, + self.fanout_threads, + self.publish_rate, + self.queue_depth, + SEND.value(), RECV_OK.value(), DROPPED.value()) + } else { + write!(f, "publishers: {} subscribers: {} fanout: {} threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}", + self.publishers, + self.subscribers, + self.fanout, + self.threads, + self.publish_rate, + self.queue_depth, + SEND.value(), RECV_OK.value(), DROPPED.value()) + } + } +} diff --git a/async-queues/src/splaycast_broadcast.rs b/async-queues/src/broadcast/splaycast.rs similarity index 99% rename from async-queues/src/splaycast_broadcast.rs rename to async-queues/src/broadcast/splaycast.rs index a6778a2..3ed9232 100644 --- a/async-queues/src/splaycast_broadcast.rs +++ b/async-queues/src/broadcast/splaycast.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use futures_lite::StreamExt; diff --git a/async-queues/src/tokio_broadcast.rs b/async-queues/src/broadcast/tokio.rs similarity index 99% rename from async-queues/src/tokio_broadcast.rs rename to async-queues/src/broadcast/tokio.rs index 97b3d62..e3473e7 100644 --- a/async-queues/src/tokio_broadcast.rs +++ b/async-queues/src/broadcast/tokio.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use ratelimit::Ratelimiter; diff --git a/async-queues/src/widecast_broadcast.rs b/async-queues/src/broadcast/widecast.rs similarity index 99% rename from async-queues/src/widecast_broadcast.rs rename to async-queues/src/broadcast/widecast.rs index 1ad28b4..38c4567 100644 --- a/async-queues/src/widecast_broadcast.rs +++ b/async-queues/src/broadcast/widecast.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use ratelimit::Ratelimiter; diff --git a/async-queues/src/lib.rs b/async-queues/src/lib.rs index c939099..051eeea 100644 --- a/async-queues/src/lib.rs +++ b/async-queues/src/lib.rs @@ -1,40 +1,14 @@ -use clap::{Parser, ValueEnum}; +use clap::ValueEnum; use histogram::SparseHistogram; use metriken::{metric, AtomicHistogram, Counter}; -use ratelimit::Ratelimiter; use std::future::Future; use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::task::JoinHandle; -pub mod async_broadcast; -pub mod broadcaster_broadcast; -pub mod splaycast_broadcast; -pub mod tokio_broadcast; -pub mod widecast_broadcast; - -pub mod async_channel; -pub mod asyncstd_channel; -pub mod flume_channel; -pub mod kanal_channel; -pub mod postage_channel; - -#[derive(ValueEnum, Copy, Clone, Debug, PartialEq)] -#[clap(rename_all = "snake_case")] -pub enum Test { - AsyncBroadcast, - BroadcasterBroadcast, - SplaycastBroadcast, - TokioBroadcast, - WidecastBroadcast, - AsyncChannel, - AsyncstdChannel, - FlumeChannel, - KanalChannel, - PostageChannel, -} +pub mod broadcast; +pub mod mpmc; pub static RUNNING: AtomicBool = AtomicBool::new(true); @@ -71,144 +45,6 @@ pub static RECV_BYTES: Counter = Counter::new(); #[metric(name = "dropped")] pub static DROPPED: Counter = Counter::new(); -#[derive(Parser, Debug, Clone)] -#[command(author, version, about, long_about = None)] -pub struct Config { - #[arg(long)] - test: Test, - - #[arg(long, default_value_t = 60)] - duration: u64, - - #[arg(long, default_value_t = 128)] - queue_depth: usize, - - #[arg(long, default_value_t = false)] - split_runtime: bool, - - #[arg(long, default_value_t = 64)] - message_length: usize, - - #[arg(long, default_value_t = 1)] - threads: usize, - - #[arg(long, default_value_t = 61)] - global_queue_interval: u32, - - #[arg(long, default_value_t = 61)] - event_interval: u32, - - #[arg(long, default_value_t = 1)] - publishers: usize, - #[arg(long, default_value_t = 1)] - publisher_threads: usize, - - #[arg(long, default_value_t = 1000)] - publish_rate: u64, - - #[arg(long, default_value_t = 1)] - subscribers: usize, - #[arg(long, default_value_t = 1)] - subscriber_threads: usize, - - #[arg(long, default_value_t = 0)] - fanout: u8, - #[arg(long, default_value_t = 1)] - fanout_threads: usize, - - #[arg(long, default_value = None)] - histogram: Option, -} - -impl Config { - pub fn test(&self) -> Test { - self.test - } - - /// Create a ratelimiter for message sending based on the config - pub fn ratelimiter(&self) -> Arc> { - if self.publish_rate == 0 { - return Arc::new(None); - } - - let quanta = (self.publish_rate / 1_000_000) + 1; - let delay = quanta * Duration::from_secs(1).as_nanos() as u64 / self.publish_rate; - - Arc::new(Some( - Ratelimiter::builder(quanta, Duration::from_nanos(delay)) - .max_tokens(quanta) - .build() - .unwrap(), - )) - } - - /// Return the queue depth to be used for the channel/queue - pub fn queue_depth(&self) -> usize { - self.queue_depth - } - - pub fn subscribers(&self) -> usize { - self.subscribers - } - - pub fn publishers(&self) -> usize { - self.publishers - } - - pub fn fanout(&self) -> u8 { - self.fanout - } - - pub fn message_length(&self) -> usize { - self.message_length - } - - /// Creates a collection of tokio runtimes, either one combined runtime or - /// a dual runtime depending on the configuration - pub fn runtime(&self) -> Runtime { - let combined = self._runtime(self.threads); - - let publisher = if !self.split_runtime { - None - } else { - Some(self._runtime(self.publisher_threads)) - }; - - let subscriber = if !self.split_runtime { - None - } else { - Some(self._runtime(self.subscriber_threads)) - }; - - let fanout = if !self.split_runtime { - None - } else { - Some(self._runtime(self.fanout_threads)) - }; - - Runtime { - config: self.clone(), - combined, - publisher, - subscriber, - fanout, - } - } - - /// Internal function to create a tokio runtime with a given number of - /// threads. This makes sure we use consistent configuration for each - /// runtime - fn _runtime(&self, threads: usize) -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(threads) - .enable_all() - .global_queue_interval(self.global_queue_interval) - .event_interval(self.event_interval) - .build() - .expect("failed to initialize runtime") - } -} - #[derive(Clone)] pub struct Message { timestamp: Instant, @@ -231,17 +67,26 @@ impl Message { } } +pub trait AsyncQueuesConfig { + fn duration(&self) -> Duration; + + fn histogram(&self) -> Option<&str>; + + fn global_queue_interval(&self) -> u32; + fn event_interval(&self) -> u32; +} + /// An abstraction for having either a combined runtime, or separate runtimes /// for publishers and subscribers -pub struct Runtime { - config: Config, +pub struct Runtime { + config: T, combined: tokio::runtime::Runtime, publisher: Option, subscriber: Option, fanout: Option, } -impl Runtime { +impl Runtime { pub fn spawn_publisher(&self, future: F) -> JoinHandle where F: Future + Send + 'static, @@ -270,7 +115,7 @@ impl Runtime { let start = std::time::Instant::now(); RUNNING.store(true, Ordering::Relaxed); - std::thread::sleep(Duration::from_secs(self.config.duration)); + std::thread::sleep(self.config.duration()); let stop = std::time::Instant::now(); RUNNING.store(false, Ordering::Relaxed); @@ -279,31 +124,10 @@ impl Runtime { println!( "global_queue_interval: {} event_interval: {}", - self.config.global_queue_interval, self.config.event_interval + self.config.global_queue_interval(), + self.config.event_interval() ); - if self.config.split_runtime { - println!("publishers: {} publisher_threads: {} subscribers: {} subscriber_threads: {} fanout: {} fanout_threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}", - self.config.publishers, - self.config.publisher_threads, - self.config.subscribers, - self.config.subscriber_threads, - self.config.fanout, - self.config.fanout_threads, - self.config.publish_rate, - self.config.queue_depth, - SEND.value(), RECV_OK.value(), DROPPED.value()); - } else { - println!("publishers: {} subscribers: {} fanout: {} threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}", - self.config.publishers, - self.config.subscribers, - self.config.fanout, - self.config.threads, - self.config.publish_rate, - self.config.queue_depth, - SEND.value(), RECV_OK.value(), DROPPED.value()); - } - let elapsed = stop.duration_since(start).as_secs_f64(); println!( "publish/s: {:.2} receive/s: {:.2} drop/s: {:.2}", @@ -327,7 +151,7 @@ impl Runtime { latencies[6].1.end() / 1000, ); - if let Some(path) = self.config.histogram { + if let Some(path) = self.config.histogram() { let sparse = SparseHistogram::from(&histogram); let json = serde_json::to_string(&sparse).expect("failed to serialize"); let mut file = std::fs::File::create(path).expect("failed to create file"); diff --git a/async-queues/src/main.rs b/async-queues/src/main.rs deleted file mode 100644 index 24a4280..0000000 --- a/async-queues/src/main.rs +++ /dev/null @@ -1,39 +0,0 @@ -use async_queues::*; -use clap::Parser; - -fn main() { - let config = Config::parse(); - - match config.test() { - Test::AsyncBroadcast => { - let _ = async_broadcast::run(config); - } - Test::AsyncChannel => { - let _ = async_channel::run(config); - } - Test::AsyncstdChannel => { - let _ = asyncstd_channel::run(config); - } - Test::BroadcasterBroadcast => { - let _ = broadcaster_broadcast::run(config); - } - Test::FlumeChannel => { - let _ = flume_channel::run(config); - } - Test::KanalChannel => { - let _ = kanal_channel::run(config); - } - Test::PostageChannel => { - let _ = postage_channel::run(config); - } - Test::SplaycastBroadcast => { - let _ = splaycast_broadcast::run(config); - } - Test::TokioBroadcast => { - let _ = tokio_broadcast::run(config); - } - Test::WidecastBroadcast => { - let _ = widecast_broadcast::run(config); - } - }; -} diff --git a/async-queues/src/async_channel.rs b/async-queues/src/mpmc/async_channel.rs similarity index 94% rename from async-queues/src/async_channel.rs rename to async-queues/src/mpmc/async_channel.rs index df9b30e..1194067 100644 --- a/async-queues/src/async_channel.rs +++ b/async-queues/src/mpmc/async_channel.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use ::async_channel::{Receiver, Sender}; @@ -13,11 +14,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); } diff --git a/async-queues/src/asyncstd_channel.rs b/async-queues/src/mpmc/asyncstd.rs similarity index 95% rename from async-queues/src/asyncstd_channel.rs rename to async-queues/src/mpmc/asyncstd.rs index 04e0d64..0ee0a2e 100644 --- a/async-queues/src/asyncstd_channel.rs +++ b/async-queues/src/mpmc/asyncstd.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use async_std::channel::{Receiver, Sender}; @@ -13,11 +14,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); } diff --git a/async-queues/src/flume_channel.rs b/async-queues/src/mpmc/flume.rs similarity index 94% rename from async-queues/src/flume_channel.rs rename to async-queues/src/mpmc/flume.rs index 80dc977..4377630 100644 --- a/async-queues/src/flume_channel.rs +++ b/async-queues/src/mpmc/flume.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use flume::{Receiver, Sender}; @@ -13,11 +14,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); } diff --git a/async-queues/src/kanal_channel.rs b/async-queues/src/mpmc/kanal.rs similarity index 95% rename from async-queues/src/kanal_channel.rs rename to async-queues/src/mpmc/kanal.rs index 546680d..6eb0df4 100644 --- a/async-queues/src/kanal_channel.rs +++ b/async-queues/src/mpmc/kanal.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use kanal::{AsyncReceiver, AsyncSender}; @@ -13,11 +14,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); } diff --git a/async-queues/src/mpmc/mod.rs b/async-queues/src/mpmc/mod.rs new file mode 100644 index 0000000..55d39ca --- /dev/null +++ b/async-queues/src/mpmc/mod.rs @@ -0,0 +1,184 @@ +use crate::*; +use clap::Parser; +use ratelimit::Ratelimiter; +use std::sync::Arc; + +pub mod async_channel; +pub mod asyncstd; +pub mod flume; +pub mod kanal; +pub mod postage; + +#[derive(ValueEnum, Copy, Clone, Debug, PartialEq)] +#[clap(rename_all = "snake_case")] +pub enum Test { + AsyncChannel, + Asyncstd, + Flume, + Kanal, + Postage, +} + +#[derive(Parser, Debug, Clone)] +#[command(author, version, about, long_about = None)] +pub struct Config { + #[arg(long)] + test: Test, + + #[arg(long, default_value_t = 60)] + duration: u64, + + #[arg(long, default_value_t = 128)] + queue_depth: usize, + + #[arg(long, default_value_t = false)] + split_runtime: bool, + + #[arg(long, default_value_t = 64)] + message_length: usize, + + #[arg(long, default_value_t = 1)] + threads: usize, + + #[arg(long, default_value_t = 61)] + global_queue_interval: u32, + + #[arg(long, default_value_t = 61)] + event_interval: u32, + + #[arg(long, default_value_t = 1)] + producers: usize, + #[arg(long, default_value_t = 1)] + producer_threads: usize, + + #[arg(long, default_value_t = 1000)] + producer_rate: u64, + + #[arg(long, default_value_t = 1)] + consumers: usize, + #[arg(long, default_value_t = 1)] + consumer_threads: usize, + + #[arg(long, default_value = None)] + histogram: Option, +} + +impl Config { + pub fn test(&self) -> Test { + self.test + } + + /// Create a ratelimiter for message sending based on the config + pub fn ratelimiter(&self) -> Arc> { + if self.producer_rate == 0 { + return Arc::new(None); + } + + let quanta = (self.producer_rate / 1_000_000) + 1; + let delay = quanta * Duration::from_secs(1).as_nanos() as u64 / self.producer_rate; + + Arc::new(Some( + Ratelimiter::builder(quanta, Duration::from_nanos(delay)) + .max_tokens(quanta) + .build() + .unwrap(), + )) + } + + /// Return the queue depth to be used for the channel/queue + pub fn queue_depth(&self) -> usize { + self.queue_depth + } + + pub fn consumers(&self) -> usize { + self.consumers + } + + pub fn producers(&self) -> usize { + self.producers + } + + pub fn message_length(&self) -> usize { + self.message_length + } + + /// Creates a collection of tokio runtimes, either one combined runtime or + /// a dual runtime depending on the configuration + pub fn runtime(&self) -> Runtime { + let combined = self._runtime(self.threads); + + let publisher = if !self.split_runtime { + None + } else { + Some(self._runtime(self.producer_threads)) + }; + + let subscriber = if !self.split_runtime { + None + } else { + Some(self._runtime(self.consumer_threads)) + }; + + Runtime { + config: self.clone(), + combined, + publisher, + subscriber, + fanout: None, + } + } + + /// Internal function to create a tokio runtime with a given number of + /// threads. This makes sure we use consistent configuration for each + /// runtime + fn _runtime(&self, threads: usize) -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(threads) + .enable_all() + .global_queue_interval(self.global_queue_interval) + .event_interval(self.event_interval) + .build() + .expect("failed to initialize runtime") + } +} + +impl AsyncQueuesConfig for Config { + fn duration(&self) -> std::time::Duration { + core::time::Duration::from_secs(self.duration) + } + + fn histogram(&self) -> std::option::Option<&str> { + self.histogram.as_deref() + } + + fn global_queue_interval(&self) -> u32 { + self.global_queue_interval + } + + fn event_interval(&self) -> u32 { + self.event_interval + } +} + +impl std::fmt::Display for Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.split_runtime { + write!(f, "producers: {} producer_threads: {} consumers: {} consumer_threads: {} producer_rate: {} queue_depth: {} send: {} recv: {} drop: {}", + self.producers, + self.producer_threads, + self.consumers, + self.consumer_threads, + self.producer_rate, + self.queue_depth, + SEND.value(), RECV_OK.value(), DROPPED.value()) + } else { + write!(f, "producers: {} consumers: {} threads: {} producer_rate: {} queue_depth: {} send: {} recv: {} drop: {}", + self.producers, + self.consumers, + self.threads, + self.producer_rate, + self.queue_depth, + SEND.value(), RECV_OK.value(), DROPPED.value()) + } + } +} diff --git a/async-queues/src/postage_channel.rs b/async-queues/src/mpmc/postage.rs similarity index 95% rename from async-queues/src/postage_channel.rs rename to async-queues/src/mpmc/postage.rs index 4d9c0d9..9df78c1 100644 --- a/async-queues/src/postage_channel.rs +++ b/async-queues/src/mpmc/postage.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use postage::dispatch::{Receiver, Sender}; @@ -15,11 +16,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); }