From 8ed48649d63c8393fd47442d939bb41858f79466 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Thu, 4 May 2023 20:56:08 +0800 Subject: [PATCH] Add metrics exporter (statsd) --- .vscode/settings.json | 11 +- Cargo.lock | 232 +++++++++++++++++++++++++++++++++ Cargo.toml | 27 +++- src/lib.rs | 279 +++++++++++++++++++++++++++++++++------- src/main.rs | 36 +++++- src/metrics_exporter.rs | 226 ++++++++++++++++++++++++++++++++ src/report.rs | 25 ++++ 7 files changed, 779 insertions(+), 57 deletions(-) create mode 100644 src/metrics_exporter.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 0e9b52a..1c76d29 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,12 +1,19 @@ { "editor.rulers": [85], - "rust-analyzer.cargo.features": ["hashlink", "mini-moka", "quick_cache", "stretto"], + "rust-analyzer.cargo.features": [ + "hashlink", + "mini-moka", + "quick_cache", + "stretto", + "metrics" + ], "rust-analyzer.server.extraEnv": { "CARGO_TARGET_DIR": "target/ra" }, "cSpell.words": [ "clippy", "Dharmendra", + "dogstatsd", "hashbrown", "Hasher", "hashlink", @@ -15,6 +22,8 @@ "moka", "mokabench", "oltp", + "pointee", + "Statsd", "thiserror", "Toolchain", "unsync" diff --git a/Cargo.lock b/Cargo.lock index d18d7fa..1503d09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,6 +31,12 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "async-io" version = "1.13.0" @@ -408,6 +414,9 @@ name = "hashbrown" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.3", +] [[package]] name = "hashlink" @@ -581,6 +590,57 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa8ebbd1a9e57bbab77b9facae7f5136aea44c356943bf9a198f647da64285d6" +dependencies = [ + "ahash 0.8.3", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-dogstatsd" +version = "0.7.0" +source = "git+https://github.com/tatsuya6502/metrics-exporter-dogstatsd?branch=metrics-v0.21#42334fb55890783d5f984ae7a9912dc1c0d1d1d6" +dependencies = [ + "indexmap", + "metrics", + "metrics-util", + "quanta 0.11.0", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-macros" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.15", +] + +[[package]] +name = "metrics-util" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "111cb375987443c3de8d503580b536f77dc8416d32db62d9456db5d93bd7ac47" +dependencies = [ + "crossbeam-epoch 0.9.14", + "crossbeam-utils 0.8.15", + "hashbrown 0.13.2", + "metrics", + "num_cpus", + "quanta 0.11.0", + "sketches-ddsketch", +] + [[package]] name = "mini-moka" version = "0.10.0" @@ -596,6 +656,18 @@ dependencies = [ "triomphe", ] +[[package]] +name = "mio" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +dependencies = [ + "libc", + "log", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.45.0", +] + [[package]] name = "moka" version = "0.8.6" @@ -706,26 +778,45 @@ name = "mokabench" version = "0.9.0" dependencies = [ "anyhow", + "arc-swap", "async-io", "async-trait", "clap", "crossbeam-channel", + "crossbeam-epoch 0.9.14", "futures-util", "hashlink", "itertools", + "metrics", + "metrics-exporter-dogstatsd", "mini-moka", "moka 0.10.2", "moka 0.11.0", "moka 0.8.6", "moka 0.9.7", + "once_cell", "parking_lot", "quick_cache", "stretto", "thiserror", + "tikv-jemalloc-ctl", + "tikv-jemallocator", "tokio", + "tracing", + "tracing-subscriber", "xxhash-rust", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num_cpus" version = "1.15.0" @@ -748,6 +839,12 @@ version = "6.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking" version = "2.1.0" @@ -777,6 +874,12 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "paste" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -805,6 +908,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "portable-atomic" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f602a0d1e09a48e4f8e8b4d4042e32807c3676da31f2ecabeac9f96226ec6c45" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1039,6 +1148,15 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "skeptic" version = "0.13.7" @@ -1054,6 +1172,12 @@ dependencies = [ "walkdir", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a406c1882ed7f29cd5e248c9848a80e7cb6ae0fea82346d2746f2f941c07e1" + [[package]] name = "slab" version = "0.4.8" @@ -1183,6 +1307,47 @@ dependencies = [ "syn 2.0.15", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e37706572f4b151dff7a0146e040804e9c26fe3a3118591112f05cf12a4216c1" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.5.3+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a678df20055b43e57ef8cddde41cdfda9a3c1a060b67f4c5836dfb1d78543ba8" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20612db8a13a6c06d57ec83953694185a367e16945f66565e8028d2c0bd76979" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "tokio" version = "1.28.0" @@ -1190,8 +1355,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f" dependencies = [ "autocfg", + "libc", + "mio", "num_cpus", "pin-project-lite", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -1207,6 +1375,64 @@ dependencies = [ "syn 2.0.15", ] +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if 1.0.0", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.15", +] + +[[package]] +name = "tracing-core" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + [[package]] name = "triomphe" version = "0.1.8" @@ -1241,6 +1467,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index 1a0feac..1bc38c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,21 +2,26 @@ name = "mokabench" version = "0.9.0" edition = "2021" +rust-version = "1.66" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = ["moka-v011"] -moka-v011 = ["moka011"] -moka-v010 = ["moka010"] -moka-v09 = ["moka09"] -moka-v08 = ["moka08"] +# Cache implementations +moka-v011 = ["dep:moka011"] +moka-v010 = ["dep:moka010"] +moka-v09 = ["dep:moka09"] +moka-v08 = ["dep:moka08"] hashlink = ["dep:hashlink"] mini-moka = ["dep:mini-moka"] quick_cache = ["dep:quick_cache"] stretto = ["dep:stretto"] +# Metrics support +metrics = ["dep:crossbeam-epoch", "dep:metrics", "dep:metrics-exporter-dogstatsd", "dep:once_cell", "dep:tracing", "dep:tracing-subscriber"] + [dependencies] anyhow = "1.0.56" async-io = "1.12.0" @@ -38,6 +43,16 @@ mini-moka = { optional = true, version = "0.10.0" } quick_cache = { optional = true, version = "0.2.1" } stretto = { optional = true, version = "0.7.1" } +# Metrics support +crossbeam-epoch = { optional = true, version = "0.9.14" } +metrics = { optional = true, version = "0.21.0" } +# metrics-exporter-dogstatsd = { optional = true, version = "0.7.0" } +metrics-exporter-dogstatsd = { optional = true, git = "https://github.com/tatsuya6502/metrics-exporter-dogstatsd", branch = "metrics-v0.21" } +once_cell = { optional = true, version = "1.17.1" } +tracing = { optional = true, version = "0.1.37" } +tracing-subscriber = { optional = true, version = "0.3.16" } +arc-swap = "1.6.0" + [dependencies.moka011] package = "moka" optional = true @@ -69,3 +84,7 @@ features = ["future", "dash"] # debug=true # debug-assertions=true # overflow-checks = true + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5.0" +tikv-jemalloc-ctl = "0.5.0" diff --git a/src/lib.rs b/src/lib.rs index 9d38328..d1ab82b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,14 +3,14 @@ any(feature = "moka-v010", feature = "moka-v09", feature = "moka-v08") ))] compile_error!( - "You cannot enable `moka-v010`, `moka-v09` and/or `moka-v8` features while `moka-v011` is enabled.\n\ + "You cannot enable `moka-v010`, `moka-v09` and/or `moka-v08` features while `moka-v011` is enabled.\n\ You might need `--no-default-features`." ); use std::io::prelude::*; -use std::sync::Arc; -use std::{fs::File, io::BufReader, time::Instant}; +use std::{fs::File, io::BufReader, sync::Arc, time::Instant}; +use crossbeam_channel::Sender; #[cfg(feature = "moka-v011")] pub(crate) use moka011 as moka; @@ -27,6 +27,7 @@ mod cache; pub mod config; mod eviction_counters; mod load_gen; +mod metrics_exporter; mod parser; mod report; mod trace_file; @@ -43,6 +44,10 @@ use cache::{ }; use config::Config; use itertools::Itertools; +use metrics_exporter::{ + MetricsNames, METRICS_NAMES_MOKA_ASYNC_CACHE, METRICS_NAMES_MOKA_SYNC_CACHE, + METRICS_NAMES_MOKA_SYNC_SEG_CACHE, +}; use parser::TraceEntry; use report::ReportBuilder; @@ -69,34 +74,57 @@ pub(crate) enum Command { Iterate, } -pub fn run_multi_threads_moka_sync( +pub async fn run_multi_threads_moka_sync( config: &Config, capacity: usize, num_clients: u16, ) -> anyhow::Result { + metrics_exporter::init("::1:8125").await; + let max_cap = if config.size_aware { capacity as u64 * 2u64.pow(15) } else { capacity as u64 }; let report_builder = ReportBuilder::new("Moka Sync Cache", max_cap, Some(num_clients)); + let metrics_names = Some(METRICS_NAMES_MOKA_SYNC_CACHE); #[cfg(not(any(feature = "moka-v08", feature = "moka-v09")))] if config.entry_api { let cache_driver = MokaSyncCache::with_entry_api(config, max_cap, capacity); - return run_multi_threads(config, num_clients, cache_driver, report_builder); + let result = run_multi_threads( + config, + num_clients, + cache_driver, + report_builder, + metrics_names, + false, + ); + metrics_exporter::shutdown(); + return result; } let cache_driver = MokaSyncCache::new(config, max_cap, capacity); - run_multi_threads(config, num_clients, cache_driver, report_builder) + let result = run_multi_threads( + config, + num_clients, + cache_driver, + report_builder, + metrics_names, + false, + ); + metrics_exporter::shutdown(); + result } -pub fn run_multi_threads_moka_segment( +pub async fn run_multi_threads_moka_segment( config: &Config, capacity: usize, num_clients: u16, num_segments: usize, ) -> anyhow::Result { + metrics_exporter::init("::1:8125").await; + let max_cap = if config.size_aware { capacity as u64 * 2u64.pow(15) } else { @@ -104,16 +132,35 @@ pub fn run_multi_threads_moka_segment( }; let report_name = format!("Moka SegmentedCache({num_segments})"); let report_builder = ReportBuilder::new(&report_name, max_cap, Some(num_clients)); + let metrics_names: Option = Some(METRICS_NAMES_MOKA_SYNC_SEG_CACHE); #[cfg(not(any(feature = "moka-v08", feature = "moka-v09")))] if config.entry_api { let cache_driver = MokaSegmentedCache::with_entry_api(config, max_cap, capacity, num_segments); - return run_multi_threads(config, num_clients, cache_driver, report_builder); + let result = run_multi_threads( + config, + num_clients, + cache_driver, + report_builder, + metrics_names, + false, + ); + metrics_exporter::shutdown(); + return result; } let cache_driver = MokaSegmentedCache::new(config, max_cap, capacity, num_segments); - run_multi_threads(config, num_clients, cache_driver, report_builder) + let result = run_multi_threads( + config, + num_clients, + cache_driver, + report_builder, + metrics_names, + false, + ); + metrics_exporter::shutdown(); + result } pub async fn run_multi_tasks_moka_async( @@ -121,21 +168,44 @@ pub async fn run_multi_tasks_moka_async( capacity: usize, num_clients: u16, ) -> anyhow::Result { + metrics_exporter::init("::1:8125").await; + let max_cap = if config.size_aware { capacity as u64 * 2u64.pow(15) } else { capacity as u64 }; let report_builder = ReportBuilder::new("Moka Async Cache", max_cap, Some(num_clients)); + let metrics_names: Option = Some(METRICS_NAMES_MOKA_ASYNC_CACHE); #[cfg(not(any(feature = "moka-v08", feature = "moka-v09")))] if config.entry_api { let cache_driver = MokaAsyncCache::with_entry_api(config, max_cap, capacity); - return run_multi_tasks(config, num_clients, cache_driver, report_builder).await; + let result = run_multi_tasks( + config, + num_clients, + cache_driver, + report_builder, + metrics_names, + false, + ) + .await; + metrics_exporter::shutdown(); + return result; } let cache_driver = MokaAsyncCache::new(config, max_cap, capacity); - run_multi_tasks(config, num_clients, cache_driver, report_builder).await + let result = run_multi_tasks( + config, + num_clients, + cache_driver, + report_builder, + metrics_names, + false, + ) + .await; + metrics_exporter::shutdown(); + result } #[cfg(any(feature = "mini-moka", feature = "moka-v08", feature = "moka-v09"))] @@ -156,7 +226,14 @@ pub fn run_multi_threads_moka_dash( "Moka Dash Cache" }; let report_builder = ReportBuilder::new(report_name, max_cap, Some(num_clients)); - run_multi_threads(config, num_clients, cache_driver, report_builder) + run_multi_threads( + config, + num_clients, + cache_driver, + report_builder, + None, + true, + ) } #[cfg(feature = "hashlink")] @@ -168,7 +245,14 @@ pub fn run_multi_threads_hashlink( let cache_driver = HashLink::new(config, capacity); let report_builder = ReportBuilder::new("HashLink (LRU w/ Mutex)", capacity as _, Some(num_clients)); - run_multi_threads(config, num_clients, cache_driver, report_builder) + run_multi_threads( + config, + num_clients, + cache_driver, + report_builder, + None, + true, + ) } #[cfg(feature = "quick_cache")] @@ -185,7 +269,14 @@ pub fn run_multi_threads_quick_cache( let cache_driver = QuickCache::new(config, capacity, max_cap); let report_builder = ReportBuilder::new("QuickCache Sync Cache", capacity as _, Some(num_clients)); - run_multi_threads(config, num_clients, cache_driver, report_builder) + run_multi_threads( + config, + num_clients, + cache_driver, + report_builder, + None, + true, + ) } #[cfg(feature = "stretto")] @@ -196,7 +287,14 @@ pub fn run_multi_threads_stretto( ) -> anyhow::Result { let cache_driver = StrettoCache::new(config, capacity); let report_builder = ReportBuilder::new("Stretto", capacity as _, Some(num_clients)); - run_multi_threads(config, num_clients, cache_driver, report_builder) + run_multi_threads( + config, + num_clients, + cache_driver, + report_builder, + None, + true, + ) } #[cfg(any(feature = "mini-moka", feature = "moka-v08", feature = "moka-v09"))] @@ -243,26 +341,26 @@ fn run_multi_threads( num_clients: u16, cache_driver: impl CacheDriver + Clone + Send + 'static, report_builder: ReportBuilder, + metrics_names: Option, + pre_process_all_commands: bool, ) -> anyhow::Result { let report_builder = Arc::new(report_builder); - let (send, receive) = crossbeam_channel::unbounded::>(); + let (send, receive) = if pre_process_all_commands { + crossbeam_channel::unbounded::>() + } else { + crossbeam_channel::bounded(100) + }; - // In order to have the minimum harness overhead and not have many consumers - // waiting for the single producer, we buffer all operations in a channel. - let mut counter = 0; - for _ in 0..(config.repeat.unwrap_or(1)) { - let f = File::open(config.trace_file.path())?; - let reader = BufReader::new(f); - for chunk in reader.lines().enumerate().chunks(BATCH_SIZE).into_iter() { - let chunk = chunk.map(|(i, r)| r.map(|s| (i, s))); - let commands = load_gen::generate_commands(config, BATCH_SIZE, &mut counter, chunk)?; - send.send(commands)?; - } + if pre_process_all_commands { + // In order to have the minimum harness overhead and not have many consumers + // waiting for the single producer, we buffer all operations in a channel. + // https://github.com/moka-rs/mokabench/pull/6 + generate_and_send_commands(config, &send)?; + } else { + // Read the whole trace file to prime the disk cache of the filesystem. + read_trace_file(config)?; } - // Drop the sender channel to notify the workers that we are finished. - std::mem::drop(send); - let instant = Instant::now(); let handles = (0..num_clients) .map(|_| { @@ -272,14 +370,46 @@ fn run_multi_threads( std::thread::spawn(move || { let mut report = rb.build(); + #[allow(unused)] + let mut prev_report = report.clone(); + #[allow(unused)] + let mut prev_epoch = 0; + while let Ok(commands) = ch.recv() { cache::process_commands(commands, &mut cache, &mut report); + + #[cfg(feature = "metrics")] + { + let epoch = metrics_exporter::current_epoch(); + if epoch > prev_epoch { + if let Some(names) = metrics_names { + let diff = report.diff(&prev_report); + prev_report = report.clone(); + prev_epoch = epoch; + metrics_exporter::report_stats(&names, &diff); + } + } + } + } + + #[cfg(feature = "metrics")] + if let Some(names) = metrics_names { + let diff = report.diff(&prev_report); + metrics_exporter::report_stats(&names, &diff); } + report }) }) .collect::>(); + if !pre_process_all_commands { + generate_and_send_commands(config, &send)?; + } + + // Drop the sender channel to notify the workers that we are finished. + std::mem::drop(send); + // Wait for the workers to finish and collect their reports. let reports = handles .into_iter() @@ -304,26 +434,26 @@ async fn run_multi_tasks( num_clients: u16, cache_driver: impl AsyncCacheDriver + Clone + Send + 'static, report_builder: ReportBuilder, + metrics_names: Option, + pre_process_all_commands: bool, ) -> anyhow::Result { let report_builder = Arc::new(report_builder); - let (send, receive) = crossbeam_channel::unbounded::>(); + let (send, receive) = if pre_process_all_commands { + crossbeam_channel::unbounded::>() + } else { + crossbeam_channel::bounded(100) + }; - // In order to have the minimum harness overhead and not have many consumers - // waiting for the single producer, we buffer all operations in a channel. - let mut counter = 0; - for _ in 0..(config.repeat.unwrap_or(1)) { - let f = File::open(config.trace_file.path())?; - let reader = BufReader::new(f); - for chunk in reader.lines().enumerate().chunks(BATCH_SIZE).into_iter() { - let chunk = chunk.map(|(i, r)| r.map(|s| (i, s))); - let commands = load_gen::generate_commands(config, BATCH_SIZE, &mut counter, chunk)?; - send.send(commands)?; - } + if pre_process_all_commands { + // In order to have the minimum harness overhead and not have many consumers + // waiting for the single producer, we buffer all operations in a channel. + // https://github.com/moka-rs/mokabench/pull/6 + generate_and_send_commands(config, &send)?; + } else { + // Read the whole trace file to prime the disk cache of the filesystem. + read_trace_file(config)?; } - // Drop the sender channel to notify the workers that we are finished. - std::mem::drop(send); - let instant = Instant::now(); let handles = (0..num_clients) .map(|_| { @@ -333,14 +463,46 @@ async fn run_multi_tasks( tokio::task::spawn(async move { let mut report = rb.build(); + #[allow(unused)] + let mut prev_report = report.clone(); + #[allow(unused)] + let mut prev_epoch = 0; + while let Ok(commands) = ch.recv() { cache::process_commands_async(commands, &mut cache, &mut report).await; + + #[cfg(feature = "metrics")] + { + let epoch = metrics_exporter::current_epoch(); + if epoch > prev_epoch { + if let Some(names) = metrics_names { + let diff = report.diff(&prev_report); + prev_report = report.clone(); + prev_epoch = epoch; + metrics_exporter::report_stats(&names, &diff); + } + } + } + } + + #[cfg(feature = "metrics")] + if let Some(names) = metrics_names { + let diff = report.diff(&prev_report); + metrics_exporter::report_stats(&names, &diff); } + report }) }) .collect::>(); + if !pre_process_all_commands { + generate_and_send_commands(config, &send)?; + } + + // Drop the sender channel to notify the workers that we are finished. + std::mem::drop(send); + // Wait for the workers to finish and collect their reports. let reports = futures_util::future::join_all(handles).await; let elapsed = instant.elapsed(); @@ -358,3 +520,30 @@ async fn run_multi_tasks( Ok(report) } + +fn generate_and_send_commands( + config: &Config, + send_channel: &Sender>, +) -> anyhow::Result<()> { + let mut counter = 0; + for _ in 0..(config.repeat.unwrap_or(1)) { + let f = File::open(config.trace_file.path())?; + let reader = BufReader::new(f); + for chunk in reader.lines().enumerate().chunks(BATCH_SIZE).into_iter() { + let chunk = chunk.map(|(i, r)| r.map(|s| (i, s))); + let commands = load_gen::generate_commands(config, BATCH_SIZE, &mut counter, chunk)?; + send_channel.send(commands)?; + } + } + + Ok(()) +} + +/// Read the whole trace file to prime the disk cache of the filesystem. +fn read_trace_file(config: &Config) -> anyhow::Result<()> { + let f = File::open(config.trace_file.path())?; + let reader = BufReader::new(f); + std::hint::black_box(reader.lines().count()); + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 7150b8e..2d21666 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,9 +7,34 @@ use mokabench::{ use clap::{Arg, Command}; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +#[cfg(target_env = "msvc")] +compile_error!("Sorry, Windows MSVC target is not supported because we cannot use jemalloc there"); + #[tokio::main] async fn main() -> anyhow::Result<()> { + #[cfg(feature = "metrics")] + { + // Show tracing logs on the console so that we can see any errors in + // metrics_exporter_dogstatsd crate. + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::INFO) + .finish(); + + tracing::subscriber::set_global_default(subscriber) + .expect("setting default subscriber failed"); + + // tracing::info!("tracing_subscriber::FmtSubscriber enabled"); + } + let (trace_files, mut config) = create_config()?; + for trace_file in trace_files { config.trace_file = trace_file; println!("{config:?}"); @@ -94,7 +119,7 @@ async fn run_with_capacity(config: &Config, capacity: usize) -> anyhow::Result<( } for num_clients in num_clients_slice { - let report = mokabench::run_multi_threads_moka_sync(config, capacity, *num_clients)?; + let report = mokabench::run_multi_threads_moka_sync(config, capacity, *num_clients).await?; println!("{}", report.to_csv_record()); } @@ -112,12 +137,9 @@ async fn run_with_capacity(config: &Config, capacity: usize) -> anyhow::Result<( let num_segments = 8; for num_clients in num_clients_slice { - let report = mokabench::run_multi_threads_moka_segment( - config, - capacity, - *num_clients, - num_segments, - )?; + let report = + mokabench::run_multi_threads_moka_segment(config, capacity, *num_clients, num_segments) + .await?; println!("{}", report.to_csv_record()); } diff --git a/src/metrics_exporter.rs b/src/metrics_exporter.rs new file mode 100644 index 0000000..952d1fd --- /dev/null +++ b/src/metrics_exporter.rs @@ -0,0 +1,226 @@ +use std::{ + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, RwLock, + }, + thread::JoinHandle, + time::Duration, +}; + +use metrics::{counter, register_gauge, Gauge}; +use once_cell::sync::Lazy; + +#[cfg(feature = "metrics")] +use metrics_exporter_dogstatsd::StatsdBuilder; + +use crate::Report; + +static EPOCH: AtomicU64 = AtomicU64::new(0); + +pub(crate) async fn init(endpoint: &str) { + let mut shared = MetricsExporter::shared().write().unwrap(); + if shared.is_initialized { + return; + } + + #[cfg(feature = "metrics")] + shared.do_init(endpoint).await; + + shared.is_initialized = true; +} + +pub(crate) fn shutdown() { + // TODO: FIXME: "this `MutexGuard` is held across an `await` point" + // https://rust-lang.github.io/rust-clippy/master/index.html#await_holding_lock + let mut shared = MetricsExporter::shared().write().unwrap(); + + #[cfg(feature = "metrics")] + shared.do_shutdown(); + + shared.is_initialized = false; +} + +pub(crate) fn report_stats(metrics_names: &MetricsNames, diff: &Report) { + #[cfg(feature = "metrics")] + { + let shared = MetricsExporter::shared().read().unwrap(); + shared.do_report_stats(metrics_names, diff); + } +} + +pub(crate) fn current_epoch() -> u64 { + EPOCH.load(Ordering::Relaxed) +} + +static SHARED: Lazy> = Lazy::new(|| RwLock::new(MetricsExporter::new())); + +struct MetricsExporter { + is_initialized: bool, + is_metrics_installed: bool, + reporter: Option>, + reporter_thread: Option>, +} + +impl MetricsExporter { + fn new() -> Self { + Self { + is_initialized: false, + is_metrics_installed: false, + reporter: None, + reporter_thread: None, + } + } + + fn shared() -> &'static RwLock { + &SHARED + } +} + +#[cfg(feature = "metrics")] +impl MetricsExporter { + async fn do_init(&mut self, endpoint: &str) { + if !self.is_metrics_installed { + StatsdBuilder::new() + // Cannot send to IPV4 address on macOS if it is bound to an IPV6 address. + // https://users.rust-lang.org/t/udpsocket-connect-fails-on-unspecified-host/69100 + // https://github.com/dialtone/metrics-exporter-dogstatsd/pull/3 + .with_push_gateway(endpoint, Duration::from_millis(498)) + .expect("Failed to set the push gateway to statsd exporter") + .set_global_prefix("mokabench") + .install() + .expect("Failed to install statsd exporter"); + + self.is_metrics_installed = true; + } + + let alloc_reporter = Arc::new(AllocationReporter::default()); + let alloc_reporter2 = Arc::clone(&alloc_reporter); + + // TODO: Give a name to the thread. + let t = std::thread::spawn(move || { + alloc_reporter2.run(); + }); + + self.reporter = Some(alloc_reporter); + self.reporter_thread = Some(t); + } + + fn do_shutdown(&mut self) { + match (self.reporter.take(), self.reporter_thread.take()) { + (Some(r), Some(h)) => { + r.shutdown(); + h.join().unwrap(); + } + (None, None) => {} + _ => unreachable!(), + } + } + + fn do_report_stats(&self, metrics_names: &MetricsNames, diff: &Report) { + counter!(metrics_names.insert_count, diff.insert_count); + counter!(metrics_names.read_count, diff.read_count); + counter!(metrics_names.hit_count, diff.hit_count); + counter!(metrics_names.invalidation_count, diff.invalidation_count); + } +} + +#[derive(Clone, Copy, Debug)] +pub(crate) struct MetricsNames { + pub(crate) insert_count: &'static str, + pub(crate) read_count: &'static str, + pub(crate) hit_count: &'static str, + pub(crate) invalidation_count: &'static str, +} + +pub(crate) static METRICS_NAMES_MOKA_SYNC_CACHE: MetricsNames = MetricsNames { + insert_count: "moka.sync.insert_count", + read_count: "moka.sync.read_count", + hit_count: "moka.sync.hit_count", + invalidation_count: "moka.sync.invalidation_count", +}; + +pub(crate) static METRICS_NAMES_MOKA_SYNC_SEG_CACHE: MetricsNames = MetricsNames { + insert_count: "moka.sync_seg.insert_count", + read_count: "moka.sync_seg.read_count", + hit_count: "moka.sync_seg.hit_count", + invalidation_count: "moka.sync_seg.invalidation_count", +}; + +pub(crate) static METRICS_NAMES_MOKA_ASYNC_CACHE: MetricsNames = MetricsNames { + insert_count: "moka.async.insert_count", + read_count: "moka.async.read_count", + hit_count: "moka.async.hit_count", + invalidation_count: "moka.async.invalidation_count", +}; + +// #[cfg()] +#[derive(Default)] +struct AllocationReporter { + is_shutting_down: AtomicBool, +} + +impl AllocationReporter { + fn run(&self) { + let resident_gauge = register_gauge!("memory.resident_mb"); + let allocated_gauge = register_gauge!("memory.allocated_mb"); + + loop { + if self.is_shutting_down.load(Ordering::Acquire) { + break; + } + EPOCH.fetch_add(1, Ordering::AcqRel); + Self::report_allocation_info(&resident_gauge, &allocated_gauge); + std::thread::sleep(Duration::from_millis(98)); + } + + Self::run_deferred(); + Self::report_allocation_info(&resident_gauge, &allocated_gauge); + } + + fn shutdown(&self) { + self.is_shutting_down.store(true, Ordering::Release); + } + + fn report_allocation_info(resident_gauge: &Gauge, allocated_gauge: &Gauge) { + use tikv_jemalloc_ctl::{epoch, stats}; + + let e = epoch::mib().unwrap(); + e.advance().unwrap(); + let resident = stats::resident::read().unwrap(); + let allocated = stats::allocated::read().unwrap(); + let resident_mb = resident as f64 / 1024.0 / 1024.0; + let allocated_mb = allocated as f64 / 1024.0 / 1024.0; + + // println!("allocation,{:.4},{:.4}", resident_mb, allocated_mb); + resident_gauge.set(resident_mb); + allocated_gauge.set(allocated_mb); + } + + /// Runs deferred destructors in crossbeam-epoch and prints the current allocation + /// info. + fn run_deferred() { + use tikv_jemalloc_ctl::{epoch, stats}; + + let mut allocated = std::usize::MAX; + let mut unchanged_count = 0usize; + loop { + crossbeam_epoch::pin().flush(); + + let e = epoch::mib().unwrap(); + e.advance().unwrap(); + let new_allocated = stats::allocated::read().unwrap(); + + if new_allocated == allocated { + unchanged_count += 1; + if unchanged_count > 50 { + break; + } + } else { + allocated = new_allocated; + unchanged_count = 0; + } + + std::thread::sleep(std::time::Duration::from_millis(2)); + } + } +} diff --git a/src/report.rs b/src/report.rs index 91e2bc6..7dd8b52 100644 --- a/src/report.rs +++ b/src/report.rs @@ -63,6 +63,31 @@ impl Report { } } + /// Returns a new report with the difference between the two reports. + pub fn diff(&self, other: &Self) -> Self { + let (l, s) = if self.insert_count > other.insert_count { + (self, other) + } else { + (other, self) + }; + + let duration = match (l.duration, s.duration) { + (Some(d_l), Some(d_s)) => Some(d_l - d_s), + _ => None, + }; + + Self { + insert_count: l.insert_count - s.insert_count, + read_count: l.read_count - s.read_count, + hit_count: l.hit_count - s.hit_count, + invalidation_count: l.invalidation_count - s.invalidation_count, + eviction_count: l.eviction_count - s.eviction_count, + expiration_count: l.expiration_count - s.expiration_count, + duration, + ..self.clone() + } + } + pub(crate) fn add_eviction_counts(&mut self, eviction_counters: &EvictionCounters) { self.has_eviction_counts = true; self.invalidation_count += eviction_counters.explicit();