Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoranYi authored and nickfrosty committed Jan 4, 2023
1 parent d5afdad commit cd2c573
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 225 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 0 additions & 91 deletions bucket_map/src/bucket_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,97 +181,6 @@ fn read_be_u64(input: &[u8]) -> u64 {
u64::from_be_bytes(input[0..std::mem::size_of::<u64>()].try_into().unwrap())
}

/// Utility function to get number of Mmap files for current process
#[cfg(target_os = "linux")]
pub fn get_mmap_count() -> Option<usize> {
let pid = std::process::id();
let map_path = format!("/proc/{}/maps", pid);
let output = std::process::Command::new("wc")
.args(["-l", &map_path])
.output()
.unwrap();
if output.status.success() {
let n: usize = std::str::from_utf8(&output.stdout)
.unwrap()
.split_whitespace()
.next()
.unwrap()
.parse()
.unwrap();

Some(n)
} else {
None
}
}

#[cfg(not(target_os = "linux"))]
pub fn get_mmap_count() -> Option<usize> {
None
}

/// Utility function to get open files limits on the system
#[cfg(target_os = "linux")]
pub fn get_open_fd_limits() -> Option<(usize, usize)> {
let run = |cmd, args| -> Option<usize> {
let output = std::process::Command::new(cmd).args(args).output().unwrap();
if output.status.success() {
let n: usize = std::str::from_utf8(&output.stdout)
.unwrap()
.split_whitespace()
.next()
.unwrap()
.parse()
.unwrap();

Some(n)
} else {
None
}
};

let soft_limit = run("sh", ["-c", "ulimit -Sn"])?;
let hard_limit = run("sh", ["-c", "ulimit -Hn"])?;

Some((soft_limit, hard_limit))
}

#[cfg(not(target_os = "linux"))]
pub fn get_open_fd_limits() -> Option<(usize, usize)> {
None
}

/// Utility function to get number of open file descriptors for current process
#[cfg(target_os = "linux")]
pub fn get_num_open_fd() -> Option<usize> {
let pid = std::process::id();
let fd_path = format!("/proc/{}/fd", pid);
let cmd = format!("ls -l {} | wc -l", fd_path);

let output = std::process::Command::new("sh")
.args(["-c", &cmd])
.output()
.unwrap();
if output.status.success() {
let n: usize = std::str::from_utf8(&output.stdout)
.unwrap()
.split_whitespace()
.next()
.unwrap()
.parse()
.unwrap();

Some(n)
} else {
None
}
}

#[cfg(not(target_os = "linux"))]
pub fn get_num_open_fd() -> Option<usize> {
None
}

#[cfg(test)]
mod tests {
use {
Expand Down
67 changes: 3 additions & 64 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use {
crate::{
bucket_map::{get_mmap_count, get_num_open_fd, get_open_fd_limits},
bucket_stats::BucketStats,
MaxSearch,
},
crate::{bucket_stats::BucketStats, MaxSearch},
memmap2::MmapMut,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
Expand Down Expand Up @@ -284,28 +280,11 @@ impl BucketStorage {
.create(true)
.open(file.clone())
.map_err(|e| {
let mmap_msg = get_mmap_count()
.map(|mmap_count| format!("current mmap_count: {}", mmap_count))
.unwrap_or_default();

let open_fd_msg = get_num_open_fd()
.map(|open_fd| format!("current open_fd: {}", open_fd))
.unwrap_or_default();

let limit_msg = get_open_fd_limits()
.map(|(soft_limit, hard_limit)| {
format!("soft_limit: {}, hard_limit: {}", soft_limit, hard_limit,)
})
.unwrap_or_default();

panic!(
"Unable to create data file {} in current dir({:?}): {:?}. {}, {}, {}",
"Unable to create data file {} in current dir({:?}): {:?}",
file.display(),
std::env::current_dir(),
e,
open_fd_msg,
mmap_msg,
limit_msg,
e
);
})
.unwrap();
Expand Down Expand Up @@ -404,7 +383,6 @@ impl BucketStorage {
mod test {
use {super::*, tempfile::tempdir};

#[cfg(target_os = "linux")]
#[test]
fn test_bucket_storage() {
let tmpdir = tempdir().unwrap();
Expand Down Expand Up @@ -434,44 +412,5 @@ mod test {
storage.free(ix, uid);
assert!(storage.is_free(ix));
assert_eq!(storage.uid(ix), None);

// test get_open_fd stats
let mmap_count = get_mmap_count().unwrap();
let open_fd = get_num_open_fd().unwrap();
let (soft_limit, hard_limit) = get_open_fd_limits().unwrap();

assert!(mmap_count > 0);
assert!(open_fd > 0);
assert!(soft_limit > 0);
assert!(hard_limit > 0);
}

/// bench get_mmap_count
/// 2M mmaps takes 1.5s
#[cfg(target_os = "linux")]
#[ignore]
#[test]
fn test_time_mmap() {
use std::time::Instant;

let mut v = vec![];
for i in 1..1900000 {
if i % 100 == 0 {
println!("{}", i);
}

let tmpdir = tempdir().unwrap();
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());
let s = BucketStorage::new(Arc::new(paths), 1, 1, 1, Arc::default(), Arc::default());
v.push(s);
}

let start = Instant::now();
let mmap_count = get_mmap_count().unwrap();
let duration = start.elapsed();

println!("{}", mmap_count);
println!("Time elapsed is: {:?}", duration);
}
}
1 change: 0 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ serde = "1.0.144"
serde_derive = "1.0.103"
solana-address-lookup-table-program = { path = "../programs/address-lookup-table", version = "=1.15.0" }
solana-bloom = { path = "../bloom", version = "=1.15.0" }
solana-bucket-map = { path = "../bucket_map", version = "=1.15.0" }
solana-client = { path = "../client", version = "=1.15.0" }
solana-entry = { path = "../entry", version = "=1.15.0" }
solana-frozen-abi = { path = "../frozen-abi", version = "=1.15.0" }
Expand Down
64 changes: 25 additions & 39 deletions core/src/system_monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use num_enum::{IntoPrimitive, TryFromPrimitive};
#[cfg(target_os = "linux")]
use std::{fs::File, io::BufReader};
use {
solana_bucket_map::bucket_map::{get_mmap_count, get_num_open_fd},
solana_sdk::timing::AtomicInterval,
std::{
collections::HashMap,
Expand All @@ -30,7 +29,6 @@ const SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS: u64 = MS_PER_H;
const SAMPLE_INTERVAL_MEM_MS: u64 = 5 * MS_PER_S;
const SAMPLE_INTERVAL_CPU_MS: u64 = 10 * MS_PER_S;
const SAMPLE_INTERVAL_DISK_MS: u64 = 5 * MS_PER_S;
const SAMPLE_INTERVAL_OPEN_FD_MS: u64 = 60 * MS_PER_S;
const SLEEP_INTERVAL: Duration = Duration::from_millis(500);

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -387,21 +385,25 @@ fn parse_disk_stats(reader_diskstats: &mut impl BufRead) -> Result<DiskStats, St
Ok(stats)
}

pub struct SystemMonitorStatsReportConfig {
pub report_os_memory_stats: bool,
pub report_os_network_stats: bool,
pub report_os_cpu_stats: bool,
pub report_os_disk_stats: bool,
pub report_os_open_fd_stats: bool,
}

impl SystemMonitorService {
pub fn new(exit: Arc<AtomicBool>, config: SystemMonitorStatsReportConfig) -> Self {
pub fn new(
exit: Arc<AtomicBool>,
report_os_memory_stats: bool,
report_os_network_stats: bool,
report_os_cpu_stats: bool,
report_os_disk_stats: bool,
) -> Self {
info!("Starting SystemMonitorService");
let thread_hdl = Builder::new()
.name("solSystemMonitr".to_string())
.spawn(move || {
Self::run(exit, config);
Self::run(
exit,
report_os_memory_stats,
report_os_network_stats,
report_os_cpu_stats,
report_os_disk_stats,
);
})
.unwrap();

Expand Down Expand Up @@ -830,22 +832,6 @@ impl SystemMonitorService {
Self::report_cpuid_values();
}

#[cfg(target_os = "linux")]
fn report_open_fd_stats() {
if let Some(curr_mmap_count) = get_mmap_count() {
if let Some(curr_open_fd) = get_num_open_fd() {
datapoint_info!(
"open-mmap-stats",
("number_mmap_files", curr_mmap_count, i64),
("number_open_fd", curr_open_fd, i64),
);
}
}
}

#[cfg(not(target_os = "linux"))]
fn report_open_fd_stats() {}

#[cfg(target_os = "linux")]
fn process_disk_stats(disk_stats: &mut Option<DiskStats>) {
match read_disk_stats() {
Expand Down Expand Up @@ -981,42 +967,42 @@ impl SystemMonitorService {
)
}

pub fn run(exit: Arc<AtomicBool>, config: SystemMonitorStatsReportConfig) {
pub fn run(
exit: Arc<AtomicBool>,
report_os_memory_stats: bool,
report_os_network_stats: bool,
report_os_cpu_stats: bool,
report_os_disk_stats: bool,
) {
let mut udp_stats = None;
let mut disk_stats = None;
let network_limits_timer = AtomicInterval::default();
let udp_timer = AtomicInterval::default();
let mem_timer = AtomicInterval::default();
let cpu_timer = AtomicInterval::default();
let disk_timer = AtomicInterval::default();
let open_fd_timer = AtomicInterval::default();

loop {
if exit.load(Ordering::Relaxed) {
break;
}
if config.report_os_network_stats {
if report_os_network_stats {
if network_limits_timer.should_update(SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS) {
Self::check_os_network_limits();
}
if udp_timer.should_update(SAMPLE_INTERVAL_UDP_MS) {
Self::process_net_stats(&mut udp_stats);
}
}
if config.report_os_memory_stats && mem_timer.should_update(SAMPLE_INTERVAL_MEM_MS) {
if report_os_memory_stats && mem_timer.should_update(SAMPLE_INTERVAL_MEM_MS) {
Self::report_mem_stats();
}
if config.report_os_cpu_stats && cpu_timer.should_update(SAMPLE_INTERVAL_CPU_MS) {
if report_os_cpu_stats && cpu_timer.should_update(SAMPLE_INTERVAL_CPU_MS) {
Self::report_cpu_stats();
}
if config.report_os_disk_stats && disk_timer.should_update(SAMPLE_INTERVAL_DISK_MS) {
if report_os_disk_stats && disk_timer.should_update(SAMPLE_INTERVAL_DISK_MS) {
Self::process_disk_stats(&mut disk_stats);
}
if config.report_os_open_fd_stats
&& open_fd_timer.should_update(SAMPLE_INTERVAL_OPEN_FD_MS)
{
Self::report_open_fd_stats();
}
sleep(SLEEP_INTERVAL);
}
}
Expand Down
17 changes: 5 additions & 12 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use {
sigverify,
snapshot_packager_service::SnapshotPackagerService,
stats_reporter_service::StatsReporterService,
system_monitor_service::{
verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
},
system_monitor_service::{verify_net_stats_access, SystemMonitorService},
tower_storage::TowerStorage,
tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE_MS},
tvu::{Tvu, TvuConfig, TvuSockets},
Expand Down Expand Up @@ -158,7 +156,6 @@ pub struct ValidatorConfig {
pub no_os_network_stats_reporting: bool,
pub no_os_cpu_stats_reporting: bool,
pub no_os_disk_stats_reporting: bool,
pub no_os_open_fd_stats_reporting: bool,
pub poh_pinned_cpu_core: usize,
pub poh_hashes_per_batch: u64,
pub process_ledger_before_services: bool,
Expand Down Expand Up @@ -221,7 +218,6 @@ impl Default for ValidatorConfig {
no_os_network_stats_reporting: true,
no_os_cpu_stats_reporting: true,
no_os_disk_stats_reporting: true,
no_os_open_fd_stats_reporting: true,
poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
process_ledger_before_services: false,
Expand Down Expand Up @@ -498,13 +494,10 @@ impl Validator {

let system_monitor_service = Some(SystemMonitorService::new(
Arc::clone(&exit),
SystemMonitorStatsReportConfig {
report_os_memory_stats: !config.no_os_memory_stats_reporting,
report_os_network_stats: !config.no_os_network_stats_reporting,
report_os_cpu_stats: !config.no_os_cpu_stats_reporting,
report_os_disk_stats: !config.no_os_disk_stats_reporting,
report_os_open_fd_stats: !config.no_os_open_fd_stats_reporting,
},
!config.no_os_memory_stats_reporting,
!config.no_os_network_stats_reporting,
!config.no_os_cpu_stats_reporting,
!config.no_os_disk_stats_reporting,
));

let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded();
Expand Down
Loading

0 comments on commit cd2c573

Please sign in to comment.