diff --git a/scylla/src/transport/histogram/config.rs b/scylla/src/transport/histogram/config.rs deleted file mode 100644 index 5188a41786..0000000000 --- a/scylla/src/transport/histogram/config.rs +++ /dev/null @@ -1,201 +0,0 @@ -/// This file is a slightly adapted version of `config.rs` from the `histogram` -/// crate released under MIT License. -use core::ops::RangeInclusive; - -/// The configuration of a histogram which determines the bucketing strategy and -/// therefore the relative error and memory utilization of a histogram. -/// * `grouping_power` - controls the number of buckets that are used to span -/// consecutive powers of two. Lower values result in less memory usage since -/// fewer buckets will be created. However, this will result in larger -/// relative error as each bucket represents a wider range of values. -/// * `max_value_power` - controls the largest value which can be stored in the -/// histogram. `2^(max_value_power) - 1` is the inclusive upper bound for the -/// representable range of values. -/// -/// # How to choose parameters for your data -/// Please see for an -/// in-depth discussion about the bucketing strategy and an interactive -/// calculator that lets you explore how these parameters result in histograms -/// with varying error guarantees and memory utilization requirements. -/// -/// # The short version -/// ## Grouping Power -/// `grouping_power` should be set such that `2^(-1 * grouping_power)` is an -/// acceptable relative error. Rephrased, we can plug-in the acceptable -/// relative error into `grouping_power = ceil(log2(1/e))`. For example, if we -/// want to limit the error to 0.1% (0.001) we should set `grouping_power = 7`. -/// -/// ## Max Value Power -/// `max_value_power` should be the closest power of 2 that is larger than the -/// largest value you expect in your data. If your only guarantee is that the -/// values are all `u64`, then setting this to `64` may be reasonable if you -/// can tolerate a bit of relative error. -/// -/// ## Resulting size -/// -/// If we want to allow any value in a range of unsigned types, the amount of -/// memory for the histogram is approximately: -/// -/// | power | error | u16 | u32 | u64 | -/// |-------|-------|---------|---------|---------| -/// | 2 | 25% | 0.6 KiB | 1 KiB | 2 KiB | -/// | 3 | 12.5% | 1 KiB | 2 KiB | 4 KiB | -/// | 4 | 6.25% | 2 KiB | 4 KiB | 8 KiB | -/// | 5 | 3.13% | 3 KiB | 7 KiB | 15 KiB | -/// | 6 | 1.56% | 6 KiB | 14 KiB | 30 KiB | -/// | 7 | .781% | 10 KiB | 26 KiB | 58 KiB | -/// | 8 | .391% | 18 KiB | 50 KiB | 114 KiB | -/// | 9 | .195% | 32 KiB | 96 KiB | 224 KiB | -/// | 10 | .098% | 56 KiB | 184 KiB | 440 KiB | -/// | 11 | .049% | 96 KiB | 352 KiB | 864 KiB | -/// | 12 | .025% | 160 KiB | 672 KiB | 1.7 MiB | -/// -/// # Constraints: -/// * `max_value_power` must be in the range `0..=64` -/// * `max_value_power` must be greater than `grouping_power -#[derive(Clone, Copy, Debug, PartialEq)] -pub struct Config { - max: u64, - grouping_power: u8, - max_value_power: u8, - cutoff_power: u8, - cutoff_value: u64, - lower_bin_count: u32, - upper_bin_divisions: u32, - upper_bin_count: u32, -} - -impl Config { - /// Create a new histogram `Config` from the parameters. See the struct - /// documentation [`crate::Config`] for the meaning of the parameters and - /// their constraints. - pub const fn new(grouping_power: u8, max_value_power: u8) -> Result { - // we only allow values up to 2^64 - if max_value_power > 64 { - return Err("max_value_power too high"); - } - - // check that the other parameters make sense together - if grouping_power >= max_value_power { - return Err("max_value_power too low"); - } - - // the cutoff is the point at which the linear range divisions and the - // logarithmic range subdivisions diverge. - // - // for example: - // when a = 0, the linear range has bins with width 1. - // if b = 7 the logarithmic range has 128 subdivisions. - // this means that for 0..128 we must be representing the values exactly - // but we also represent 128..256 exactly since the subdivisions divide - // that range into bins with the same width as the linear portion. - // - // therefore our cutoff power = a + b + 1 - - // note: because a + b must be less than n which is a u8, a + b + 1 must - // be less than or equal to u8::MAX. This means our cutoff power will - // always fit in a u8 - let cutoff_power = grouping_power + 1; - let cutoff_value = 2_u64.pow(cutoff_power as u32); - let lower_bin_width = 2_u32.pow(0); - let upper_bin_divisions = 2_u32.pow(grouping_power as u32); - - let max = if max_value_power == 64 { - u64::MAX - } else { - 2_u64.pow(max_value_power as u32) - }; - - let lower_bin_count = (cutoff_value / lower_bin_width as u64) as u32; - let upper_bin_count = (max_value_power - cutoff_power) as u32 * upper_bin_divisions; - - Ok(Self { - max, - grouping_power, - max_value_power, - cutoff_power, - cutoff_value, - lower_bin_count, - upper_bin_divisions, - upper_bin_count, - }) - } - - /// Returns the grouping power that was used to create this configuration. - pub const fn grouping_power(&self) -> u8 { - self.grouping_power - } - - /// Returns the max value power that was used to create this configuration. - pub const fn max_value_power(&self) -> u8 { - self.max_value_power - } - - /// Returns the relative error (in percentage) of this configuration. This - /// only applies to the logarithmic bins of the histogram (linear bins have - /// a width of 1 and no error). For histograms with no logarithmic bins, - /// error for the entire histogram is zero. - pub fn error(&self) -> f64 { - match self.grouping_power == self.max_value_power - 1 { - true => 0.0, - false => 100.0 / 2_u64.pow(self.grouping_power as u32) as f64, - } - } - - /// Return the total number of buckets needed for this config. - pub const fn total_buckets(&self) -> usize { - (self.lower_bin_count + self.upper_bin_count) as usize - } - - /// Converts a value to a bucket index. Returns an error if the value is - /// outside of the range for the config. - pub(crate) fn value_to_index(&self, value: u64) -> Result { - if value < self.cutoff_value { - return Ok(value as usize); - } - - if value > self.max { - return Err("value out of range for histogram"); - } - - let power = 63 - value.leading_zeros(); - let log_bin = power - self.cutoff_power as u32; - let offset = (value - (1 << power)) >> (power - self.grouping_power as u32); - - Ok((self.lower_bin_count + log_bin * self.upper_bin_divisions + offset as u32) as usize) - } - - /// Convert a bucket index to a lower bound. - pub(crate) fn index_to_lower_bound(&self, index: usize) -> u64 { - let g = index as u64 >> self.grouping_power; - let h = index as u64 - g * (1 << self.grouping_power); - - if g < 1 { - h - } else { - (1 << (self.grouping_power as u64 + g - 1)) + (1 << (g - 1)) * h - } - } - - /// Convert a bucket index to a upper inclusive bound. - #[allow(dead_code)] - pub(crate) fn index_to_upper_bound(&self, index: usize) -> u64 { - if index as u32 == self.lower_bin_count + self.upper_bin_count - 1 { - return self.max; - } - let g = index as u64 >> self.grouping_power; - let h = index as u64 - g * (1 << self.grouping_power) + 1; - - if g < 1 { - h - 1 - } else { - (1 << (self.grouping_power as u64 + g - 1)) + (1 << (g - 1)) * h - 1 - } - } - - /// Convert a bucket index to a range. - #[allow(dead_code)] - pub(crate) fn index_to_range(&self, index: usize) -> RangeInclusive { - self.index_to_lower_bound(index)..=self.index_to_upper_bound(index) - } -} diff --git a/scylla/src/transport/histogram/lock_free_histogram.rs b/scylla/src/transport/histogram/lock_free_histogram.rs deleted file mode 100644 index 75aa61370c..0000000000 --- a/scylla/src/transport/histogram/lock_free_histogram.rs +++ /dev/null @@ -1,279 +0,0 @@ -use std::hint; -/// This file was inspired by a lock-free histogram implementation -/// from the Prometheus library in Go. -/// https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go -/// Note: in the current implementation, the histogram *may* incur a data race -/// after (1 << 63) increments (which is quite a lot). -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; - -use super::Config; - -const ORDER_TYPE: Ordering = Ordering::Relaxed; -const HIGH_BIT: u64 = 1 << 63; -const LOW_MASK: u64 = HIGH_BIT - 1; - -#[derive(Debug)] -pub struct Histogram { - /// Hot index - index of the hot pool. - /// Count - number of all started observations. - /// Use of both of these variables in one AtomicU64 - /// allows use of a lock-free algorithm. - hot_idx_and_count: AtomicU64, - /// Finished observations for each bucket pool. - pool_counts: [AtomicU64; 2], - /// Two bucket pools: hot and cold. - bucket_pools: [Box<[AtomicU64]>; 2], - /// Mutex for "writers" (logging operations) - /// (logging time is not as critical). - logger_mutex: Arc>, - /// Standard configuration for math operations. - config: Config, -} - -/// Snapshot is a structure that contains histogram statistics such as -/// min, max, mean, standard deviation, median, and most common percentiles -/// collected in a certain moment. -#[derive(Debug)] -pub struct Snapshot { - pub min: u64, - pub max: u64, - pub mean: u64, - pub stddev: u64, - pub median: u64, - pub percentile_75: u64, - pub percentile_90: u64, - pub percentile_95: u64, - pub percentile_99: u64, - pub percentile_99_9: u64, -} - -impl Histogram { - pub fn new() -> Self { - let grouping_power = 7; - let max_value_power = 64; - let config = Config::new(grouping_power, max_value_power) - .expect("default histogram construction failure"); - - Self::with_config(&config) - } - - pub fn with_config(config: &Config) -> Self { - let mut buckets1 = Vec::with_capacity(config.total_buckets()); - buckets1.resize_with(config.total_buckets(), || AtomicU64::new(0)); - let mut buckets2 = Vec::with_capacity(config.total_buckets()); - buckets2.resize_with(config.total_buckets(), || AtomicU64::new(0)); - - Self { - hot_idx_and_count: AtomicU64::new(0), - pool_counts: [AtomicU64::new(0), AtomicU64::new(0)], - bucket_pools: [buckets1.into(), buckets2.into()], - logger_mutex: Arc::new(Mutex::new(0)), - config: *config, - } - } - - pub fn increment(&self, value: u64) -> Result<(), &'static str> { - // Increment started observations count. - let n = self.hot_idx_and_count.fetch_add(1, ORDER_TYPE); - let hot_idx = (n >> 63) as usize; - - // Increment the corresponding bucket value. - let idx = self.config.value_to_index(value)?; - self.bucket_pools[hot_idx][idx].fetch_add(1, ORDER_TYPE); - - // Increment finished observations count. - self.pool_counts[hot_idx].fetch_add(1, ORDER_TYPE); - Ok(()) - } - - pub fn mean() -> impl FnOnce(&[AtomicU64], &Config) -> Result { - |buckets, config| { - let total_count = Histogram::get_total_count(buckets); - - let mut weighted_sum = 0; - for (i, bucket) in buckets.iter().enumerate() { - // Note: we choose index_to_lower_bound here - // but that choice is arbitrary. - weighted_sum += - bucket.load(ORDER_TYPE) as u128 * config.index_to_lower_bound(i) as u128; - } - Ok((weighted_sum / total_count) as u64) - } - } - - pub fn percentile( - percentile: f64, - ) -> impl FnOnce(&[AtomicU64], &Config) -> Result { - move |buckets, config| { - if !(0.0..100.0).contains(&percentile) { - return Err("percentile out of bounds"); - } - - let total_count = Histogram::get_total_count(buckets); - let count = (percentile / 100.0 * total_count as f64).ceil() as u128; - - let mut pref_sum = 0; - for (i, bucket) in buckets.iter().enumerate() { - if pref_sum >= count { - // Note: we choose index_to_lower_bound here and after the loop - // but that choice is arbitrary. - return Ok(config.index_to_lower_bound(i)); - } - pref_sum += bucket.load(ORDER_TYPE) as u128; - } - Ok(config.index_to_lower_bound(buckets.len() - 1)) - } - } - - pub fn snapshot() -> impl FnOnce(&[AtomicU64], &Config) -> Result { - |buckets, config| { - let total_count = Histogram::get_total_count(buckets); - - let mut min = u64::MAX; - let mut max = 0; - let mut weighted_sum = 0; - let mut pref_sum = 0; - let mut percentile_75 = 0; - let mut percentile_90 = 0; - let mut percentile_95 = 0; - let mut percentile_99 = 0; - let mut percentile_99_9 = 0; - - let percentile_75_threshold = (0.75 * total_count as f64).ceil() as u128; - let percentile_90_threshold = (0.9 * total_count as f64).ceil() as u128; - let percentile_95_threshold = (0.95 * total_count as f64).ceil() as u128; - let percentile_99_threshold = (0.99 * total_count as f64).ceil() as u128; - let percentile_99_9_threshold = (0.999 * total_count as f64).ceil() as u128; - - for (i, bucket) in buckets.iter().enumerate() { - let count = bucket.load(ORDER_TYPE) as u128; - if count == 0 { - continue; - } - - let lower_bound = config.index_to_lower_bound(i); - let upper_bound = config.index_to_upper_bound(i); - - if lower_bound < min { - min = lower_bound; - } - if upper_bound > max { - max = upper_bound; - } - - weighted_sum += count * lower_bound as u128; - - let next_pref_sum = pref_sum + count; - if pref_sum < percentile_75_threshold && next_pref_sum >= percentile_75_threshold { - percentile_75 = lower_bound; - } - if pref_sum < percentile_90_threshold && next_pref_sum >= percentile_90_threshold { - percentile_90 = lower_bound; - } - if pref_sum < percentile_95_threshold && next_pref_sum >= percentile_95_threshold { - percentile_95 = lower_bound; - } - if pref_sum < percentile_99_threshold && next_pref_sum >= percentile_99_threshold { - percentile_99 = lower_bound; - } - if pref_sum < percentile_99_9_threshold - && next_pref_sum >= percentile_99_9_threshold - { - percentile_99_9 = lower_bound; - } - - pref_sum = next_pref_sum; - } - - let mean = (weighted_sum / total_count) as u64; - let mut variance_sum = 0; - for (i, bucket) in buckets.iter().enumerate() { - let count = bucket.load(ORDER_TYPE) as u128; - if count == 0 { - continue; - } - - let lower_bound = config.index_to_lower_bound(i); - variance_sum += count * (lower_bound as u128 - mean as u128).pow(2); - } - let variance = variance_sum / total_count; - let stddev = (variance as f64).sqrt() as u64; - - Ok(Snapshot { - min, - max, - mean, - stddev, - median: config.index_to_lower_bound(buckets.len() / 2), - percentile_75, - percentile_90, - percentile_95, - percentile_99, - percentile_99_9, - }) - } - } - - pub fn get_total_count(buckets: &[AtomicU64]) -> u128 { - buckets.iter().map(|v| v.load(ORDER_TYPE) as u128).sum() - } - - pub fn log_operation( - &self, - f: impl FnOnce(&[AtomicU64], &Config) -> Result, - ) -> Result { - // Lock the "writers" mutex. - let _guard = self.logger_mutex.lock(); - if _guard.is_err() { - return Err("couldn't lock the logger mutex"); - } - let _guard = _guard.unwrap(); - - // Switch the hot-cold index (wrapping add on highest bit). - let n = self.hot_idx_and_count.fetch_add(HIGH_BIT, ORDER_TYPE); - let started_count = n & LOW_MASK; - - let hot_idx = (n >> 63) as usize; - let cold_idx = 1 - hot_idx; - - let hot_counts = &self.pool_counts[hot_idx]; - let cold_counts = &self.pool_counts[cold_idx]; - - // Wait until the old hot observers (now working on the currently - // cold bucket pool) finish their job. - // (Since observer's job is fast, we can wait in a spin loop). - while started_count != cold_counts.load(ORDER_TYPE) { - hint::spin_loop(); - } - - // Now there are no active observers on the cold pool, so we can safely - // access the data without a logical race. - let result = f(&self.bucket_pools[cold_idx], &self.config); - - // Compund the cold histogram results onto the already running hot ones. - // Note that no logging operation can run now as we still hold - // the mutex, so it doesn't matter that the entire operation isn't atomic. - - // Update finished observations' counts. - hot_counts.fetch_add(cold_counts.load(ORDER_TYPE), ORDER_TYPE); - cold_counts.store(0, ORDER_TYPE); - - // Update bucket values (both pools have the same length). - for i in 0..self.bucket_pools[0].len() { - let hot_bucket = &self.bucket_pools[hot_idx][i]; - let cold_bucket = &self.bucket_pools[cold_idx][i]; - - hot_bucket.fetch_add(cold_bucket.load(ORDER_TYPE), ORDER_TYPE); - cold_bucket.store(0, ORDER_TYPE); - } - - result - } -} - -impl Default for Histogram { - fn default() -> Self { - Histogram::new() - } -} diff --git a/scylla/src/transport/histogram/mod.rs b/scylla/src/transport/histogram/mod.rs deleted file mode 100644 index 2a3025f284..0000000000 --- a/scylla/src/transport/histogram/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -mod config; -mod lock_free_histogram; - -pub use config::Config; -pub use lock_free_histogram::Histogram; -pub use lock_free_histogram::Snapshot; diff --git a/scylla/src/transport/lock_free_histogram.rs b/scylla/src/transport/lock_free_histogram.rs new file mode 100644 index 0000000000..11ada57e4c --- /dev/null +++ b/scylla/src/transport/lock_free_histogram.rs @@ -0,0 +1,291 @@ +/// This file was inspired by a lock-free histogram implementation +/// from the Prometheus library in Go. +/// https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go +/// Note: in the current implementation, the histogram *may* incur a data race +/// after (1 << 63) increments (which is quite a lot). +use histogram::{AtomicHistogram, Histogram}; +use std::hint; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use thiserror::Error; + +const ORDER_TYPE: Ordering = Ordering::Relaxed; +const HIGH_BIT: u64 = 1 << 63; +const LOW_MASK: u64 = HIGH_BIT - 1; + +#[derive(Error, Debug, PartialEq)] +pub enum LFError { + #[error("invalid use of histogram")] + HistogramErr(#[from] histogram::Error), + #[error("could not lock the snapshot mutex")] + Mutex, +} + +pub struct LockFreeHistogram { + /// Hot index - index of the hot pool. + /// Count - number of all started observations. + /// Use of both of these variables in one AtomicU64 + /// allows use of a lock-free algorithm. + hot_idx_and_count: AtomicU64, + /// Finished observations for each bucket pool. + pool_counts: [AtomicU64; 2], + /// Two histgorams in a pool: hot and cold. + histogram_pool: [AtomicHistogram; 2], + /// Mutex for "writers" (snapshot operations) + /// (snapshot time is not as critical). + snapshot_mutex: Arc>, +} + +impl LockFreeHistogram { + pub fn new(grouping_power: u8, max_value_power: u8) -> Self { + Self { + hot_idx_and_count: AtomicU64::new(0), + pool_counts: [AtomicU64::new(0), AtomicU64::new(0)], + histogram_pool: [ + AtomicHistogram::new(grouping_power, max_value_power).unwrap(), + AtomicHistogram::new(grouping_power, max_value_power).unwrap(), + ], + snapshot_mutex: Arc::new(Mutex::new(0)), + } + } + + pub fn increment(&self, value: u64) -> Result<(), LFError> { + // Increment started observations count. + let n = self.hot_idx_and_count.fetch_add(1, ORDER_TYPE); + let hot_idx = (n >> 63) as usize; + + // Increment the corresponding bucket value. + self.histogram_pool[hot_idx].increment(value)?; + + // Increment finished observations count. + self.pool_counts[hot_idx].fetch_add(1, ORDER_TYPE); + Ok(()) + } + + pub fn snapshot(&self) -> Result { + // Lock the "writers" mutex. + let _guard = self.snapshot_mutex.lock(); + if _guard.is_err() { + return Err(LFError::Mutex); + } + let _guard = _guard.unwrap(); + + // Switch the hot-cold index (wrapping add on highest bit). + // Note: n is the one from *before* the addition, so we repeat it. + let n = self + .hot_idx_and_count + .fetch_add(HIGH_BIT, ORDER_TYPE) + .wrapping_add(HIGH_BIT); + + let started_count = n & LOW_MASK; + + let hot_idx = (n >> 63) as usize; + let cold_idx = 1 - hot_idx; + + let hot_counts = &self.pool_counts[hot_idx]; + let cold_counts = &self.pool_counts[cold_idx]; + + // Wait until the old hot observers (now working on the currently + // cold bucket pool) finish their job. + // (Since observer's job is fast, we can wait in a spin loop). + while started_count != cold_counts.load(ORDER_TYPE) { + hint::spin_loop(); + } + + // Now there are no active observers on the cold pool, so we can safely + // access the data without a logical race. + let result = self.histogram_pool[cold_idx].load(); + + // Compund the cold histogram results onto the already running hot ones. + // Note that no snapshot operation can run now as we still hold + // the mutex, so it doesn't matter that the entire operation isn't atomic. + + // Update finished observations' counts. + hot_counts.fetch_add(cold_counts.load(ORDER_TYPE), ORDER_TYPE); + cold_counts.store(0, ORDER_TYPE); + + // Update hot bucket values. + let snapshot = self.histogram_pool[cold_idx].drain(); + for bucket in snapshot.into_iter() { + self.histogram_pool[hot_idx].add(bucket.start(), bucket.count())?; + } + + Ok(result) + } +} + +impl Default for LockFreeHistogram { + fn default() -> Self { + // Config: 64ms error, values in range [0ms, ~262_000ms]. + // Size: ~2^12 * 8B * 2 = 64MB. + let grouping_power = 12; + let max_value_power = 18; + LockFreeHistogram::new(grouping_power, max_value_power) + } +} + +impl std::fmt::Debug for LockFreeHistogram { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let h0 = self.histogram_pool[0].load(); + let h1 = self.histogram_pool[1].load(); + write!(f, + "LockFreeHistogram {{ hot_idx_and_count: {:?}, pool_counts: {:?}, histogram_pool: {:?}, snapshot_mutex: {:?} }}", + self.hot_idx_and_count, + self.pool_counts, + [h0, h1], + self.snapshot_mutex, + ) + } +} + +#[cfg(test)] +mod tests { + use super::{LockFreeHistogram, ORDER_TYPE}; + use std::sync::{atomic::AtomicU64, Arc}; + use std::thread; + + fn spawn_all( + observer: impl Fn() + Clone + Send + 'static, + observer_count: u64, + snapshot: impl Fn() + Clone + Send + 'static, + snapshot_count: u64, + ) { + // Spawn all threads. + // Note: snapshots must be joined first! + // That's because observers wait for snapshots to + // finish correctly, so we need to catch the snapshot + // panic before joining any observer thread. + let mut handles = Vec::new(); + for _ in 0..snapshot_count { + handles.push(thread::spawn(snapshot.clone())); + } + for _ in 0..observer_count { + handles.push(thread::spawn(observer.clone())); + } + + for thread in handles { + thread.join().unwrap(); + } + } + + /// Information liveness check on consecutive histogram snapshots. + /// + /// Setup: + /// - observer threads atomically incrementing the histogram, + /// - snapshot threads fighting for exclusive access. + /// + /// Expected behavior: + /// - all snapshot threads see constantly updating values, + /// - no deadlock occurs. + fn liveness(observer_count: u64, snapshot_count: u64, rep: u64) { + let h = Arc::new(LockFreeHistogram::default()); + // The number of finished snapshot *threads*. + let finished_snapshots = Arc::new(AtomicU64::new(0)); + // The number of finished observations. + let obs_done = Arc::new(AtomicU64::new(0)); + + // Observer thread lambda. + let h_1 = h.clone(); + let fin_s_1 = finished_snapshots.clone(); + let obs_done_1 = obs_done.clone(); + let observer = move || { + while fin_s_1.load(ORDER_TYPE) < snapshot_count { + h_1.increment(0).unwrap(); + obs_done_1.fetch_add(1, ORDER_TYPE); + } + }; + + // Snapshot thread lambda. + let snapshot = move || { + let mut prev_obs = obs_done.load(ORDER_TYPE); + let mut prev_count = 0; // This is not true, but we don't care. + + for _ in 0..rep { + let cur_obs = obs_done.load(ORDER_TYPE); + let s = h.snapshot().unwrap(); + + // If an observation occured since the last iteration. + if cur_obs != prev_obs { + // Make sure we noticed the new observation in the snapshot. + let count = s.percentile(100.0).unwrap().unwrap().count(); + assert!(prev_count != count); + + prev_count = count; + prev_obs = cur_obs; + } + } + // It wasn't an empty run. + assert!(prev_obs != 0); + // Mark as finished. + finished_snapshots.fetch_add(1, ORDER_TYPE); + }; + + spawn_all(observer, observer_count, snapshot, snapshot_count); + } + + #[test] + fn liveness_tests() { + liveness(1, 2, 1000); + liveness(2, 1, 1000); + liveness(10, 10, 100); + } + + /// Atomic snapshot test. + /// + /// Setup: + /// - 1 observer alternating between incrementing two distinct values + /// of the histogram, + /// - 1 snapshot. + /// + /// Expected behavior: + /// - the difference between said values in each snapshot is either 1 or 0. + #[test] + fn atomic_tests() { + // The number of snapshots taken. + let rep = 10_000; + let snapshot_count = 1; + // A value of which increment affects a bucket other than the first one. + let value_in_other_bucket = 2_u64.pow(10); + + // The histogram is non-empty for code simplicity. + let h = Arc::new(LockFreeHistogram::default()); + h.increment(0).unwrap(); + h.increment(value_in_other_bucket).unwrap(); + + // The number of finished snapshot *threads*. + let finished_snapshots = Arc::new(AtomicU64::new(0)); + + // Observer thread lambda. + let h_1 = h.clone(); + let fin_s_1 = finished_snapshots.clone(); + let observer = move || { + while fin_s_1.load(ORDER_TYPE) < snapshot_count { + h_1.increment(0).unwrap(); + h_1.increment(value_in_other_bucket).unwrap(); + } + }; + + // Snapshot thread lambda. + let snapshot = move || { + for _ in 0..rep { + let s = h.snapshot().unwrap(); + // Get count at first bucket. + let bucket_front = s.percentile(10.0).unwrap().unwrap(); + let count_front = bucket_front.count(); + // Get count at the other bucket. + let bucket_back = s.percentile(100.0).unwrap().unwrap(); + let count_back = bucket_back.count(); + + // Make sure the test actually works + // (the other bucket is not the first one). + assert!(bucket_front != bucket_back); + // Make sure the snapshot was atomic. + assert!(count_front >= count_back && count_front - count_back <= 1); + } + // Mark as finished. + finished_snapshots.fetch_add(1, ORDER_TYPE); + }; + + spawn_all(observer, 1, snapshot, 1); + } +} diff --git a/scylla/src/transport/metrics.rs b/scylla/src/transport/metrics.rs index 20919004be..4bcb900d52 100644 --- a/scylla/src/transport/metrics.rs +++ b/scylla/src/transport/metrics.rs @@ -1,24 +1,36 @@ -use crate::transport::histogram::{Histogram, Snapshot}; +use crate::transport::lock_free_histogram::{LFError, LockFreeHistogram}; +use histogram::Histogram; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use thiserror::Error; const ORDER_TYPE: Ordering = Ordering::Relaxed; -#[derive(Debug)] -pub struct MetricsError { - cause: &'static str, -} - -impl From<&'static str> for MetricsError { - fn from(err: &'static str) -> MetricsError { - MetricsError { cause: err } - } +#[derive(Error, Debug, PartialEq)] +pub enum MetricsError { + #[error("lock-free histogram error")] + LFHistogramErr(#[from] LFError), + #[error("histogram error")] + HistogramErr(#[from] histogram::Error), + #[error("histogram is empty")] + Empty, } -impl std::fmt::Display for MetricsError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "metrics error: {}", self.cause) - } +/// Snapshot is a structure that contains histogram statistics such as +/// min, max, mean, standard deviation, median, and most common percentiles +/// collected in a certain moment. +#[derive(Debug)] +pub struct Snapshot { + pub min: u64, + pub max: u64, + pub mean: u64, + pub stddev: u64, + pub median: u64, + pub percentile_75: u64, + pub percentile_90: u64, + pub percentile_95: u64, + pub percentile_99: u64, + pub percentile_99_9: u64, } #[derive(Default, Debug)] @@ -28,7 +40,7 @@ pub struct Metrics { errors_iter_num: AtomicU64, queries_iter_num: AtomicU64, retries_num: AtomicU64, - histogram: Arc, + histogram: Arc, } impl Metrics { @@ -39,7 +51,7 @@ impl Metrics { errors_iter_num: AtomicU64::new(0), queries_iter_num: AtomicU64::new(0), retries_num: AtomicU64::new(0), - histogram: Arc::new(Histogram::new()), + histogram: Arc::new(LockFreeHistogram::default()), } } @@ -82,8 +94,7 @@ impl Metrics { /// Returns average latency in milliseconds pub fn get_latency_avg_ms(&self) -> Result { - let mean = self.histogram.log_operation(Histogram::mean())?; - Ok(mean) + Self::mean(self.histogram.snapshot()?) } /// Returns latency from histogram for a given percentile @@ -91,18 +102,38 @@ impl Metrics { /// /// * `percentile` - float value (0.0 - 100.0) pub fn get_latency_percentile_ms(&self, percentile: f64) -> Result { - let result = self - .histogram - .log_operation(Histogram::percentile(percentile))?; - Ok(result) + let bucket = self.histogram.snapshot()?.percentile(percentile)?; + + if let Some(p) = bucket { + Ok(p.count()) + } else { + Err(MetricsError::Empty) + } } /// Returns snapshot of histogram metrics taken at the moment of calling this function. \ /// Available metrics: min, max, mean, std_dev, median, /// percentile_90, percentile_95, percentile_99, percentile_99_9. pub fn get_snapshot(&self) -> Result { - let snapshot = self.histogram.log_operation(Histogram::snapshot())?; - Ok(snapshot) + let h = self.histogram.snapshot()?; + + let (min, max) = Self::minmax(h.clone())?; + + let percentile_args = [0.75, 0.9, 0.95, 0.99, 0.999]; + let percentiles = Self::percentiles(&h, &percentile_args)?; + + Ok(Snapshot { + min, + max, + mean: Self::mean(h.clone())?, + stddev: Self::stddev(h.clone())?, + median: Self::median(h)?, + percentile_75: percentiles[0], + percentile_90: percentiles[1], + percentile_95: percentiles[2], + percentile_99: percentiles[3], + percentile_99_9: percentiles[4], + }) } /// Returns counter for errors occurred in nonpaged queries @@ -129,4 +160,83 @@ impl Metrics { pub fn get_retries_num(&self) -> u64 { self.retries_num.load(ORDER_TYPE) } + + /// Metric implementations + + fn mean(h: Histogram) -> Result { + // Compute the mean (count each bucket as its interval's center). + let mut weighted_sum = 0_u128; + let mut count = 0_u128; + + for bucket in h.into_iter() { + let mid = ((bucket.start() + bucket.end()) / 2) as u128; + weighted_sum += mid * bucket.count() as u128; + count += bucket.count() as u128; + } + + if count != 0 { + Ok((weighted_sum / count) as u64) + } else { + Err(MetricsError::Empty) + } + } + + fn median(h: Histogram) -> Result { + // TODO: get the real median by iterating through the buckets. + Ok(0) + } + + fn percentiles(h: &Histogram, percentiles: &[f64]) -> Result, MetricsError> { + let res = h.percentiles(percentiles)?; + if let Some(ps) = res { + Ok(ps.into_iter().map(|(_, bucket)| bucket.count()).collect()) + } else { + Err(MetricsError::Empty) + } + } + + fn stddev(h: Histogram) -> Result { + let total_count = h + .clone() + .into_iter() + .map(|bucket| bucket.count() as u128) + .sum::(); + + let mean = Self::mean(h.clone())? as u128; + let mut variance_sum = 0; + for bucket in h.into_iter() { + let count = bucket.count() as u128; + let mid = ((bucket.start() + bucket.end()) / 2) as u128; + + variance_sum += count * (mid - mean).pow(2); + } + let variance = variance_sum / total_count; + + return Ok((variance as f64).sqrt() as u64); + } + + fn minmax(h: Histogram) -> Result<(u64, u64), MetricsError> { + let mut min = u64::MAX; + let mut max = 0; + for bucket in h.clone().into_iter() { + if bucket.count() == 0 { + continue; + } + let lower_bound = bucket.start(); + let upper_bound = bucket.end(); + + if lower_bound < min { + min = lower_bound; + } + if upper_bound > max { + max = upper_bound; + } + } + + if min > max { + Err(MetricsError::Empty) + } else { + Ok((min, max)) + } + } } diff --git a/scylla/src/transport/mod.rs b/scylla/src/transport/mod.rs index debe23f331..7e368c810d 100644 --- a/scylla/src/transport/mod.rs +++ b/scylla/src/transport/mod.rs @@ -5,14 +5,14 @@ mod connection_pool; pub mod downgrading_consistency_retry_policy; pub mod errors; pub mod execution_profile; -#[cfg(feature = "metrics")] -pub mod histogram; pub mod host_filter; pub mod iterator; pub mod legacy_query_result; pub mod load_balancing; pub mod locator; #[cfg(feature = "metrics")] +pub mod lock_free_histogram; +#[cfg(feature = "metrics")] pub(crate) mod metrics; mod node; pub mod partitioner;