Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async-queues: refactoring #3

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions async-queues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ authors = ["Brian Martin <[email protected]>"]
repository = "https://github.com/iopsystems/workloads"
license = "MIT OR Apache-2.0"

[[bin]]
name = "async-queues-broadcast"
path = "src/async-queues-broadcast.rs"

[[bin]]
name = "async-queues-mpmc"
path = "src/async-queues-mpmc.rs"

[dependencies]
async-broadcast = "0.6.0"
async-channel = "2.1.1"
Expand All @@ -25,7 +33,3 @@ splaycast = "0.2.0"
tokio = { version = "1.35.1", features = ["full"] }
tokio-stream = "0.1.14"
widecast = { git = "https://github.com/Phantomical/widecast" }

[[bin]]
name = "async-queues"
path = "src/main.rs"
25 changes: 25 additions & 0 deletions async-queues/src/async-queues-broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use async_queues::broadcast;
use async_queues::broadcast::{Config, Test};
use clap::Parser;

fn main() {
let config = Config::parse();

match config.test() {
Test::AsyncBroadcast => {
let _ = broadcast::async_broadcast::run(config);
}
Test::Broadcaster => {
let _ = broadcast::broadcaster::run(config);
}
Test::Splaycast => {
let _ = broadcast::splaycast::run(config);
}
Test::Tokio => {
let _ = broadcast::tokio::run(config);
}
Test::Widecast => {
let _ = broadcast::widecast::run(config);
}
};
}
25 changes: 25 additions & 0 deletions async-queues/src/async-queues-mpmc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use async_queues::mpmc;
use async_queues::mpmc::{Config, Test};
use clap::Parser;

fn main() {
let config = Config::parse();

match config.test() {
Test::AsyncChannel => {
let _ = mpmc::async_channel::run(config);
}
Test::Asyncstd => {
let _ = mpmc::asyncstd::run(config);
}
Test::Flume => {
let _ = mpmc::flume::run(config);
}
Test::Kanal => {
let _ = mpmc::kanal::run(config);
}
Test::Postage => {
let _ = mpmc::postage::run(config);
}
};
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Config;
use crate::*;

use ::async_broadcast::{Receiver, RecvError, Sender};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Config;
use crate::*;

use broadcaster::*;
Expand Down
202 changes: 202 additions & 0 deletions async-queues/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use crate::*;
use clap::Parser;
use ratelimit::Ratelimiter;
use std::sync::Arc;

pub mod async_broadcast;
pub mod broadcaster;
pub mod splaycast;
pub mod tokio;
pub mod widecast;

#[derive(ValueEnum, Copy, Clone, Debug, PartialEq)]
#[clap(rename_all = "snake_case")]
pub enum Test {
AsyncBroadcast,
Broadcaster,
Splaycast,
Tokio,
Widecast,
}

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Config {
#[arg(long)]
test: Test,

#[arg(long, default_value_t = 60)]
duration: u64,

#[arg(long, default_value_t = 128)]
queue_depth: usize,

#[arg(long, default_value_t = false)]
split_runtime: bool,

#[arg(long, default_value_t = 64)]
message_length: usize,

#[arg(long, default_value_t = 1)]
threads: usize,

#[arg(long, default_value_t = 61)]
global_queue_interval: u32,

#[arg(long, default_value_t = 61)]
event_interval: u32,

#[arg(long, default_value_t = 1)]
publishers: usize,
#[arg(long, default_value_t = 1)]
publisher_threads: usize,

#[arg(long, default_value_t = 1000)]
publish_rate: u64,

#[arg(long, default_value_t = 1)]
subscribers: usize,
#[arg(long, default_value_t = 1)]
subscriber_threads: usize,

#[arg(long, default_value_t = 0)]
fanout: u8,
#[arg(long, default_value_t = 1)]
fanout_threads: usize,

#[arg(long, default_value = None)]
histogram: Option<String>,
}

impl Config {
pub fn test(&self) -> Test {
self.test
}

/// Create a ratelimiter for message sending based on the config
pub fn ratelimiter(&self) -> Arc<Option<Ratelimiter>> {
if self.publish_rate == 0 {
return Arc::new(None);
}

let quanta = (self.publish_rate / 1_000_000) + 1;
let delay = quanta * Duration::from_secs(1).as_nanos() as u64 / self.publish_rate;

Arc::new(Some(
Ratelimiter::builder(quanta, Duration::from_nanos(delay))
.max_tokens(quanta)
.build()
.unwrap(),
))
}

/// Return the queue depth to be used for the channel/queue
pub fn queue_depth(&self) -> usize {
self.queue_depth
}

pub fn subscribers(&self) -> usize {
self.subscribers
}

pub fn publishers(&self) -> usize {
self.publishers
}

pub fn fanout(&self) -> u8 {
self.fanout
}

pub fn message_length(&self) -> usize {
self.message_length
}

/// Creates a collection of tokio runtimes, either one combined runtime or
/// a dual runtime depending on the configuration
pub fn runtime(&self) -> Runtime<Self> {
let combined = self._runtime(self.threads);

let publisher = if !self.split_runtime {
None
} else {
Some(self._runtime(self.publisher_threads))
};

let subscriber = if !self.split_runtime {
None
} else {
Some(self._runtime(self.subscriber_threads))
};

let fanout = if !self.split_runtime {
None
} else {
Some(self._runtime(self.fanout_threads))
};

Runtime {
config: self.clone(),
combined,
publisher,
subscriber,
fanout,
}
}

/// Internal function to create a tokio runtime with a given number of
/// threads. This makes sure we use consistent configuration for each
/// runtime
fn _runtime(&self, threads: usize) -> ::tokio::runtime::Runtime {
::tokio::runtime::Builder::new_multi_thread()
.worker_threads(threads)
.enable_all()
.global_queue_interval(self.global_queue_interval)
.event_interval(self.event_interval)
.build()
.expect("failed to initialize runtime")
}
}

impl AsyncQueuesConfig for Config {
fn duration(&self) -> std::time::Duration {
core::time::Duration::from_secs(self.duration)
}

fn histogram(&self) -> std::option::Option<&str> {
self.histogram.as_deref()
}

fn global_queue_interval(&self) -> u32 {
self.global_queue_interval
}

fn event_interval(&self) -> u32 {
self.event_interval
}
}

impl std::fmt::Display for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.split_runtime {
write!(f, "publishers: {} publisher_threads: {} subscribers: {} subscriber_threads: {} fanout: {} fanout_threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}",
self.publishers,
self.publisher_threads,
self.subscribers,
self.subscriber_threads,
self.fanout,
self.fanout_threads,
self.publish_rate,
self.queue_depth,
SEND.value(), RECV_OK.value(), DROPPED.value())
} else {
write!(f, "publishers: {} subscribers: {} fanout: {} threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}",
self.publishers,
self.subscribers,
self.fanout,
self.threads,
self.publish_rate,
self.queue_depth,
SEND.value(), RECV_OK.value(), DROPPED.value())
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Config;
use crate::*;

use futures_lite::StreamExt;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Config;
use crate::*;

use ratelimit::Ratelimiter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Config;
use crate::*;

use ratelimit::Ratelimiter;
Expand Down
Loading