Skip to content

Commit

Permalink
periodic sampling of metrics to avoid flood, fixup examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Dec 17, 2024
1 parent 3c83c09 commit 2c6a9dd
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 84 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions thread-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ affinity = "0.1.2"

[dev-dependencies]
axum = "0.7.9"
env_logger = { workspace = true }
serde_json = { workspace = true }
toml = { workspace = true }
36 changes: 21 additions & 15 deletions thread-manager/examples/core_contention_basics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{
future::IntoFuture,
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
use {
agave_thread_manager::*,
log::{debug, info},
std::{
future::IntoFuture,
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
},
};

async fn axum_main(port: u16) {
Expand Down Expand Up @@ -31,35 +35,36 @@ async fn axum_main(port: u16) {
match timeout {
Ok(v) => v.unwrap(),
Err(_) => {
println!("Terminating server on port {port}");
info!("Terminating server on port {port}");
}
}
}
use agave_thread_manager::*;

fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let experiments = [
"examples/core_contention_dedicated_set.json",
"examples/core_contention_contending_set.json",
"examples/core_contention_dedicated_set.toml",
"examples/core_contention_contending_set.toml",
];

for exp in experiments {
println!("===================");
println!("Running {exp}");
info!("===================");
info!("Running {exp}");
let mut conffile = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
conffile.push(exp);
let mut buf = String::new();
std::fs::File::open(conffile)?.read_to_string(&mut buf)?;
let cfg: RuntimeManagerConfig = toml::from_str(&buf)?;
//println!("Loaded config {}", serde_json::to_string_pretty(&cfg)?);

let rtm = ThreadManager::new(cfg).unwrap();
let tok1 = rtm
.get_tokio("axum1")
.expect("Expecting runtime named axum1");
tok1.start_metrics_sampling(Duration::from_secs(1));
let tok2 = rtm
.get_tokio("axum2")
.expect("Expecting runtime named axum2");
tok2.start_metrics_sampling(Duration::from_secs(1));

let wrk_cores: Vec<_> = (32..64).collect();
let results = std::thread::scope(|s| {
Expand All @@ -72,6 +77,7 @@ fn main() -> anyhow::Result<()> {
let jh = s.spawn(|| run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap());
jh.join().expect("WRK crashed!")
});
//print out the results of the bench run
println!("Results are: {:?}", results);
}
Ok(())
Expand Down Expand Up @@ -112,7 +118,7 @@ fn run_wrk(
let mut all_latencies = vec![];
let mut all_rps = vec![];
for (out, port) in outs.zip(ports.iter()) {
println!("=========================");
debug!("=========================");
std::io::stdout().write_all(&out.stderr)?;
let res = str::from_utf8(&out.stdout)?;
let mut res = res.lines().last().unwrap().split(' ');
Expand All @@ -122,7 +128,7 @@ fn run_wrk(

let requests: usize = res.next().unwrap().parse()?;
let rps = requests as f32 / 10.0;
println!("WRK results for port {port}: {latency:?} {rps}");
debug!("WRK results for port {port}: {latency:?} {rps}");
all_latencies.push(Duration::from_micros(latency_us));
all_rps.push(rps);
}
Expand Down
67 changes: 35 additions & 32 deletions thread-manager/examples/core_contention_sweep.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::{
collections::HashMap,
future::IntoFuture,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
use {
agave_thread_manager::*,
log::{debug, info},
std::{
collections::HashMap,
future::IntoFuture,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
},
};

async fn axum_main(port: u16) {
use axum::{routing::get, Router};

// basic handler that responds with a static string
async fn root() -> &'static str {
tokio::time::sleep(Duration::from_millis(1)).await;
Expand All @@ -24,6 +27,7 @@ async fn axum_main(port: u16) {
tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
.await
.unwrap();
info!("Server on port {port} ready");
let timeout = tokio::time::timeout(
Duration::from_secs(11),
axum::serve(listener, app).into_future(),
Expand All @@ -32,11 +36,10 @@ async fn axum_main(port: u16) {
match timeout {
Ok(v) => v.unwrap(),
Err(_) => {
println!("Terminating server on port {port}");
info!("Terminating server on port {port}");
}
}
}
use agave_thread_manager::*;
fn make_config_shared(cc: usize) -> RuntimeManagerConfig {
let tokio_cfg_1 = TokioConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: cc },
Expand All @@ -46,12 +49,8 @@ fn make_config_shared(cc: usize) -> RuntimeManagerConfig {
let tokio_cfg_2 = tokio_cfg_1.clone();
RuntimeManagerConfig {
tokio_configs: HashMap::from([
("tokio1".into(), tokio_cfg_1),
("tokio2".into(), tokio_cfg_2),
]),
tokio_runtime_mapping: HashMap::from([
("axum1".into(), "tokio1".into()),
("axum2".into(), "tokio2".into()),
("axum1".into(), tokio_cfg_1),
("axum2".into(), tokio_cfg_2),
]),
..Default::default()
}
Expand All @@ -75,12 +74,8 @@ fn make_config_dedicated(cc: usize) -> RuntimeManagerConfig {
};
RuntimeManagerConfig {
tokio_configs: HashMap::from([
("tokio1".into(), tokio_cfg_1),
("tokio2".into(), tokio_cfg_2),
]),
tokio_runtime_mapping: HashMap::from([
("axum1".into(), "tokio1".into()),
("axum2".into(), "tokio2".into()),
("axum1".into(), tokio_cfg_1),
("axum2".into(), tokio_cfg_2),
]),
..Default::default()
}
Expand All @@ -93,7 +88,7 @@ enum Regime {
Single,
}
impl Regime {
const VALUES: [Self; 3] = [Self::Shared, Self::Dedicated, Self::Single];
const VALUES: [Self; 3] = [Self::Dedicated, Self::Shared, Self::Single];
}

#[derive(Debug, Default, serde::Serialize)]
Expand All @@ -103,13 +98,14 @@ struct Results {
}

fn main() -> anyhow::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let mut all_results: HashMap<String, Results> = HashMap::new();
for regime in Regime::VALUES {
let mut res = Results::default();
for core_cnt in [2, 4, 8, 16] {
let rtm;
println!("===================");
println!("Running {core_cnt} cores under {regime:?}");
info!("===================");
info!("Running {core_cnt} cores under {regime:?}");
let (tok1, tok2) = match regime {
Regime::Shared => {
rtm = ThreadManager::new(make_config_shared(core_cnt)).unwrap();
Expand Down Expand Up @@ -143,24 +139,26 @@ fn main() -> anyhow::Result<()> {
let wrk_cores: Vec<_> = (32..64).collect();
let results = std::thread::scope(|s| {
s.spawn(|| {
tok1.tokio.spawn(axum_main(8888));
tok1.start_metrics_sampling(Duration::from_secs(1));
tok1.tokio.block_on(axum_main(8888));
});
let jh = match regime {
Regime::Single => s.spawn(|| {
run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 1000).unwrap()
run_wrk(&[8888, 8888], &wrk_cores, wrk_cores.len(), 3000).unwrap()
}),
_ => {
s.spawn(|| {
tok2.tokio.spawn(axum_main(8889));
tok2.start_metrics_sampling(Duration::from_secs(1));
tok2.tokio.block_on(axum_main(8889));
});
s.spawn(|| {
run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 1000).unwrap()
run_wrk(&[8888, 8889], &wrk_cores, wrk_cores.len(), 3000).unwrap()
})
}
};
jh.join().expect("WRK crashed!")
});
println!("Results are: {:?}", results);
info!("Results are: {:?}", results);
res.latencies_s.push(
results.0.iter().map(|a| a.as_secs_f32()).sum::<f32>() / results.0.len() as f32,
);
Expand All @@ -169,6 +167,8 @@ fn main() -> anyhow::Result<()> {
all_results.insert(format!("{regime:?}"), res);
std::thread::sleep(Duration::from_secs(3));
}

//print the resulting measurements so they can be e.g. plotted with matplotlib
println!("{}", serde_json::to_string_pretty(&all_results)?);

Ok(())
Expand All @@ -180,6 +180,9 @@ fn run_wrk(
threads: usize,
connections: usize,
) -> anyhow::Result<(Vec<Duration>, Vec<f32>)> {
//Sleep a bit to let axum start
std::thread::sleep(Duration::from_millis(500));

let mut script = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
script.push("examples/report.lua");
let cpus: Vec<String> = cpus.iter().map(|c| c.to_string()).collect();
Expand Down Expand Up @@ -209,7 +212,7 @@ fn run_wrk(
let mut all_latencies = vec![];
let mut all_rps = vec![];
for (out, port) in outs.zip(ports.iter()) {
println!("=========================");
debug!("=========================");
std::io::stdout().write_all(&out.stderr)?;
let res = str::from_utf8(&out.stdout)?;
let mut res = res.lines().last().unwrap().split(' ');
Expand All @@ -219,7 +222,7 @@ fn run_wrk(

let requests: usize = res.next().unwrap().parse()?;
let rps = requests as f32 / 10.0;
println!("WRK results for port {port}: {latency:?} {rps}");
debug!("WRK results for port {port}: {latency:?} {rps}");
all_latencies.push(Duration::from_micros(latency_us));
all_rps.push(rps);
}
Expand Down
21 changes: 19 additions & 2 deletions thread-manager/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use {
serde::{Deserialize, Serialize},
std::sync::OnceLock,
thread_priority::ThreadExt,
};

static CORE_COUNT: OnceLock<usize> = OnceLock::new();

#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub enum CoreAllocation {
///Use OS default allocation (i.e. do not alter core affinity)
Expand All @@ -17,17 +20,31 @@ pub enum CoreAllocation {
impl CoreAllocation {
/// Converts into a vector of core IDs. OsDefault is converted to empty vector.
pub fn as_core_mask_vector(&self) -> Vec<usize> {
let core_count = CORE_COUNT.get_or_init(num_cpus::get);
match *self {
CoreAllocation::PinnedCores { min, max } => (min..max).collect(),
CoreAllocation::DedicatedCoreSet { min, max } => (min..max).collect(),
CoreAllocation::OsDefault => vec![],
CoreAllocation::OsDefault => Vec::from_iter(0..*core_count),
}
}
}

#[cfg(target_os = "linux")]
pub fn set_thread_affinity(cores: &[usize]) {
affinity::set_thread_affinity(cores).expect("Can not set thread affinity for runtime worker");
assert!(
!cores.is_empty(),
"Can not call setaffinity with empty cores mask"
);
if let Err(e) = affinity::set_thread_affinity(cores) {
let thread = std::thread::current();
let msg = format!(
"Can not set core affinity {:?} for thread {:?} named {:?}, error {e}",
cores,
thread.id(),
thread.name()
);
panic!("{}", msg);
}
}

#[cfg(not(target_os = "linux"))]
Expand Down
Loading

0 comments on commit 2c6a9dd

Please sign in to comment.