Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IO metrics #26804

Merged
merged 8 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions core/src/system_monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ const SAMPLE_INTERVAL_UDP_MS: u64 = 2 * MS_PER_S;
const SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS: u64 = MS_PER_H;
const SAMPLE_INTERVAL_MEM_MS: u64 = MS_PER_S;
const SAMPLE_INTERVAL_CPU_MS: u64 = MS_PER_S;
const SAMPLE_INTERVAL_DISK_MS: u64 = MS_PER_S;
const SLEEP_INTERVAL: Duration = Duration::from_millis(500);

#[cfg(target_os = "linux")]
const PROC_NET_SNMP_PATH: &str = "/proc/net/snmp";
#[cfg(target_os = "linux")]
const PROC_NET_DEV_PATH: &str = "/proc/net/dev";
#[cfg(target_os = "linux")]
const PROC_DISKSTATS_PATH: &str = "/proc/diskstats";

pub struct SystemMonitorService {
thread_hdl: JoinHandle<()>,
Expand Down Expand Up @@ -96,6 +99,32 @@ struct CpuInfo {
num_threads: u64,
}

#[derive(Default)]
#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
// These stats are aggregated across all storage devices excluding internal loopbacks.
// Fields are cumulative since boot with the exception of 'num_disks' and 'io_in_progress'
struct DiskStats {
reads_completed: u64,
reads_merged: u64,
sectors_read: u64,
time_reading_ms: u64,
writes_completed: u64,
writes_merged: u64,
sectors_written: u64,
time_writing_ms: u64,
io_in_progress: u64,
time_io_ms: u64,
// weighted time multiplies time performing IO by number of commands in the queue
time_io_weighted_ms: u64,
discards_completed: u64,
discards_merged: u64,
sectors_discarded: u64,
time_discarding: u64,
flushes_completed: u64,
time_flushing: u64,
num_disks: u64,
}

impl UdpStats {
fn from_map(udp_stats: &HashMap<String, u64>) -> Self {
Self {
Expand Down Expand Up @@ -223,12 +252,61 @@ pub fn verify_net_stats_access() -> Result<(), String> {
Ok(())
}

#[cfg(target_os = "linux")]
fn read_disk_stats() -> Result<DiskStats, String> {
let file_path_diskstats = PROC_DISKSTATS_PATH;
let file_diskstats = File::open(file_path_diskstats).map_err(|e| e.to_string())?;
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
let mut reader_diskstats = BufReader::new(file_diskstats);
parse_disk_stats(&mut reader_diskstats)
}

#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
fn parse_disk_stats(reader_diskstats: &mut impl BufRead) -> Result<DiskStats, String> {
let mut stats = DiskStats::default();
let mut num_disks = 0;
for line in reader_diskstats.lines() {
let line = line.map_err(|e| e.to_string())?;
let values: Vec<_> = line.split_ascii_whitespace().collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect() isn't strictly necessary. could next() our way to victory on the iterator instead


if values.len() != 20 {
return Err("parse error, expected exactly 20 disk stat elements".to_string());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're probably screwed here, but would it make sense to log and continue instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking it would be better to not get any metrics rather than potentially report incorrect metrics. Added some tolerance for all 3 kernel variations that I'm aware of (11, 15, or 17 elements)

}
if values[2].starts_with("loop") || values[1].ne("0") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will double-count at least dm-crypt volumes.

$ cat /proc/diskstats | grep dm
 253       0 dm-0 182486 0 7848082 48716 706874 0 22199432 10941388 0 610468 10990104 0 0 0 0 0 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking this is solved by using sysfs instead of procfs since we'll only look at block devices. Does that sound right to you?

// Filter out the loopback io devices.
// Only look at raw device (filter partitions)
continue;
}

num_disks += 1;
stats.reads_completed += values[3].parse::<u64>().map_err(|e| e.to_string())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, should we be totally bailing or just continueing

Copy link
Contributor Author

@bw-solana bw-solana Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking it would be better to not get any metrics rather than potentially report incorrect metrics. Hopefully parsing succeeds the next go around and we just report delta for a longer time period

stats.reads_merged += values[4].parse::<u64>().map_err(|e| e.to_string())?;
stats.sectors_read += values[5].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_reading_ms += values[6].parse::<u64>().map_err(|e| e.to_string())?;
stats.writes_completed += values[7].parse::<u64>().map_err(|e| e.to_string())?;
stats.writes_merged += values[8].parse::<u64>().map_err(|e| e.to_string())?;
stats.sectors_written += values[9].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_writing_ms += values[10].parse::<u64>().map_err(|e| e.to_string())?;
stats.io_in_progress += values[11].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_io_ms += values[12].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_io_weighted_ms += values[13].parse::<u64>().map_err(|e| e.to_string())?;
stats.discards_completed += values[14].parse::<u64>().map_err(|e| e.to_string())?;
stats.discards_merged += values[15].parse::<u64>().map_err(|e| e.to_string())?;
stats.sectors_discarded += values[16].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_discarding += values[17].parse::<u64>().map_err(|e| e.to_string())?;
stats.flushes_completed += values[18].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_flushing += values[19].parse::<u64>().map_err(|e| e.to_string())?;
}
stats.num_disks = num_disks;
Ok(stats)
}

impl SystemMonitorService {
pub fn new(
exit: Arc<AtomicBool>,
report_os_memory_stats: bool,
report_os_network_stats: bool,
report_os_cpu_stats: bool,
report_os_disk_stats: bool,
) -> Self {
info!("Starting SystemMonitorService");
let thread_hdl = Builder::new()
Expand All @@ -239,6 +317,7 @@ impl SystemMonitorService {
report_os_memory_stats,
report_os_network_stats,
report_os_cpu_stats,
report_os_disk_stats,
);
})
.unwrap();
Expand Down Expand Up @@ -572,17 +651,155 @@ impl SystemMonitorService {
}
}

#[cfg(target_os = "linux")]
fn process_disk_stats(disk_stats: &mut Option<DiskStats>) {
match read_disk_stats() {
Ok(new_stats) => {
if let Some(old_stats) = disk_stats {
Self::report_disk_stats(old_stats, &new_stats);
}
*disk_stats = Some(new_stats);
}
Err(e) => warn!("read_disk_stats: {}", e),
}
}

#[cfg(not(target_os = "linux"))]
fn process_disk_stats(_disk_stats: &mut Option<DiskStats>) {}

#[cfg(target_os = "linux")]
fn report_disk_stats(old_stats: &DiskStats, new_stats: &DiskStats) {
datapoint_info!(
"disk-stats",
(
"reads_completed",
new_stats
.reads_completed
.saturating_sub(old_stats.reads_completed),
i64
),
(
"reads_merged",
new_stats
.reads_merged
.saturating_sub(old_stats.reads_merged),
i64
),
(
"sectors_read",
new_stats
.sectors_read
.saturating_sub(old_stats.sectors_read),
i64
),
(
"time_reading_ms",
new_stats
.time_reading_ms
.saturating_sub(old_stats.time_reading_ms),
i64
),
(
"writes_completed",
new_stats
.writes_completed
.saturating_sub(old_stats.writes_completed),
i64
),
(
"writes_merged",
new_stats
.writes_merged
.saturating_sub(old_stats.writes_merged),
i64
),
(
"sectors_written",
new_stats
.sectors_written
.saturating_sub(old_stats.sectors_written),
i64
),
(
"time_writing_ms",
new_stats
.time_writing_ms
.saturating_sub(old_stats.time_writing_ms),
i64
),
("io_in_progress", new_stats.io_in_progress, i64),
(
"time_io_ms",
new_stats.time_io_ms.saturating_sub(old_stats.time_io_ms),
i64
),
(
"time_io_weighted_ms",
new_stats
.time_io_weighted_ms
.saturating_sub(old_stats.time_io_weighted_ms),
i64
),
(
"discards_completed",
new_stats
.discards_completed
.saturating_sub(old_stats.discards_completed),
i64
),
(
"discards_merged",
new_stats
.discards_merged
.saturating_sub(old_stats.discards_merged),
i64
),
(
"sectors_discarded",
new_stats
.sectors_discarded
.saturating_sub(old_stats.sectors_discarded),
i64
),
(
"time_discarding",
new_stats
.time_discarding
.saturating_sub(old_stats.time_discarding),
i64
),
(
"flushes_completed",
new_stats
.flushes_completed
.saturating_sub(old_stats.flushes_completed),
i64
),
(
"time_flushing",
new_stats
.time_flushing
.saturating_sub(old_stats.time_flushing),
i64
),
("num_disks", new_stats.num_disks, i64),
)
}

pub fn run(
exit: Arc<AtomicBool>,
report_os_memory_stats: bool,
report_os_network_stats: bool,
report_os_cpu_stats: bool,
report_os_disk_stats: bool,
) {
let mut udp_stats = None;
let mut disk_stats = None;
let network_limits_timer = AtomicInterval::default();
let udp_timer = AtomicInterval::default();
let mem_timer = AtomicInterval::default();
let cpu_timer = AtomicInterval::default();
let disk_timer = AtomicInterval::default();

loop {
if exit.load(Ordering::Relaxed) {
Expand All @@ -602,6 +819,9 @@ impl SystemMonitorService {
if report_os_cpu_stats && cpu_timer.should_update(SAMPLE_INTERVAL_CPU_MS) {
Self::report_cpu_stats();
}
if report_os_disk_stats && disk_timer.should_update(SAMPLE_INTERVAL_DISK_MS) {
Self::process_disk_stats(&mut disk_stats);
}
sleep(SLEEP_INTERVAL);
}
}
Expand Down Expand Up @@ -670,6 +890,41 @@ data" as &[u8];
assert!(stats.is_err());
}

#[test]
fn test_parse_disk_stats() {
const MOCK_DISK: &[u8] =
b" 7 0 loop0 108 0 2906 27 0 0 0 0 0 40 27 0 0 0 0 0 0
7 1 loop1 48 0 716 23 0 0 0 0 0 28 23 0 0 0 0 0 0
7 2 loop2 108 0 2916 21 0 0 0 0 0 36 21 0 0 0 0 0 0
7 3 loop3 257 0 4394 131 0 0 0 0 0 296 131 0 0 0 0 0 0
7 4 loop4 111 0 2896 62 0 0 0 0 0 68 62 0 0 0 0 0 0
7 5 loop5 110 0 2914 138 0 0 0 0 0 112 138 0 0 0 0 0 0
7 6 loop6 68 0 2200 47 0 0 0 0 0 44 47 0 0 0 0 0 0
7 7 loop7 1397 0 101686 515 0 0 0 0 0 4628 515 0 0 0 0 0 0
8 0 sda 40883273 294820 1408426268 30522643 352908152 204249001 37827695922 2073754124 0 86054536 2105005805 496399 4 1886486166 167545 18008621 561492
8 1 sda1 40882879 291543 1408408989 30522451 352908150 204249001 37827695920 2073754122 0 86054508 2104444115 496393 0 1886085576 167541 0 0
8 14 sda14 73 0 832 22 0 0 0 0 0 48 22 0 0 0 0 0 0
8 15 sda15 146 3277 9855 62 2 0 2 1 0 76 68 6 4 400590 3 0 0
7 9 loop9 55 0 2106 41 0 0 0 0 0 28 41 0 0 0 0 0 0
7 8 loop8 41 0 688 53 0 0 0 0 0 44 53 0 0 0 0 0 0
7 10 loop10 60 0 748 1 0 0 0 0 0 20 1 0 0 0 0 0 0
9 0 sdb 1 1 1 1 352908152 204249001 37827695922 2073754124 0 86054536 2105005805 496399 4 1886486166 167545 18008621 561492" as &[u8];
const UNEXPECTED_DATA: &[u8] = b"un
ex
pec
ted
data" as &[u8];

let mut mock_disk = MOCK_DISK;
let stats = parse_disk_stats(&mut mock_disk).unwrap();
assert_eq!(stats.reads_completed, 40883274);
assert_eq!(stats.time_flushing, 1122984);

let mut mock_disk = UNEXPECTED_DATA;
let stats = parse_disk_stats(&mut mock_disk);
assert!(stats.is_err());
}

#[test]
fn test_calc_percent() {
assert!(SystemMonitorService::calc_percent(99, 100) < 100.0);
Expand Down
3 changes: 3 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ pub struct ValidatorConfig {
pub no_os_memory_stats_reporting: bool,
pub no_os_network_stats_reporting: bool,
pub no_os_cpu_stats_reporting: bool,
pub no_os_disk_stats_reporting: bool,
pub poh_pinned_cpu_core: usize,
pub poh_hashes_per_batch: u64,
pub account_indexes: AccountSecondaryIndexes,
Expand Down Expand Up @@ -223,6 +224,7 @@ impl Default for ValidatorConfig {
no_os_memory_stats_reporting: true,
no_os_network_stats_reporting: true,
no_os_cpu_stats_reporting: true,
no_os_disk_stats_reporting: true,
poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
account_indexes: AccountSecondaryIndexes::default(),
Expand Down Expand Up @@ -504,6 +506,7 @@ impl Validator {
!config.no_os_memory_stats_reporting,
!config.no_os_network_stats_reporting,
!config.no_os_cpu_stats_reporting,
!config.no_os_disk_stats_reporting,
));

let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded();
Expand Down
1 change: 1 addition & 0 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,7 @@ fn main() {
!no_os_memory_stats_reporting,
false,
false,
false,
);

accounts_index_config.index_limit_mb = if let Some(limit) =
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
no_os_memory_stats_reporting: config.no_os_memory_stats_reporting,
no_os_network_stats_reporting: config.no_os_network_stats_reporting,
no_os_cpu_stats_reporting: config.no_os_cpu_stats_reporting,
no_os_disk_stats_reporting: config.no_os_disk_stats_reporting,
poh_pinned_cpu_core: config.poh_pinned_cpu_core,
account_indexes: config.account_indexes.clone(),
accounts_db_caching_enabled: config.accounts_db_caching_enabled,
Expand Down
6 changes: 6 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,11 @@ pub fn main() {
.long("no-os-cpu-stats-reporting")
.help("Disable reporting of OS CPU statistics.")
)
.arg(
Arg::with_name("no_os_disk_stats_reporting")
.long("no-os-disk-stats-reporting")
.help("Disable reporting of OS disk statistics.")
)
.arg(
Arg::with_name("accounts-hash-interval-slots")
.long("accounts-hash-interval-slots")
Expand Down Expand Up @@ -2641,6 +2646,7 @@ pub fn main() {
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"),
no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"),
no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"),
poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core")
.unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
poh_hashes_per_batch: value_of(&matches, "poh_hashes_per_batch")
Expand Down