Skip to content

Commit

Permalink
use proper logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Dec 15, 2024
1 parent 1e043c1 commit df8b9d2
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions thread-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,10 @@ one may want to spawn many rayon pools.
* better metrics integration
* proper error handling everywhere
* even more tests


# Examples
All examples need wrk for workload generation. Please install it before running.

* core_contention_basics will demonstrate why core contention is bad, and how thread configs can help
* core_contention_sweep will sweep across a range of core counts to show how benefits scale with core counts
71 changes: 20 additions & 51 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ pub struct RuntimeManagerConfig {

impl ThreadManager {
pub fn get_native(&self, name: &str) -> Option<&NativeThreadRuntime> {
let n = self.native_runtime_mapping.get(name)?;
self.native_thread_runtimes.get(n)
let name = self.native_runtime_mapping.get(name)?;
self.native_thread_runtimes.get(name)
}
pub fn get_rayon(&self, name: &str) -> Option<&RayonRuntime> {
let n = self.rayon_runtime_mapping.get(name)?;
self.rayon_runtimes.get(n)
let name = self.rayon_runtime_mapping.get(name)?;
self.rayon_runtimes.get(name)
}
pub fn get_tokio(&self, name: &str) -> Option<&TokioRuntime> {
let n = self.tokio_runtime_mapping.get(name)?;
self.tokio_runtimes.get(n)
let name = self.tokio_runtime_mapping.get(name)?;
self.tokio_runtimes.get(name)
}
pub fn set_process_affinity(config: &RuntimeManagerConfig) -> anyhow::Result<Vec<usize>> {
let chosen_cores_mask = config.default_core_allocation.as_core_mask_vector();
Expand Down Expand Up @@ -158,8 +158,8 @@ mod tests {
// Nobody runs Agave on windows, and on Mac we can not set mask affinity without patching external crate
#[cfg(target_os = "linux")]
fn validate_affinity(expect_cores: &[usize], error_msg: &str) {
let aff = affinity::get_thread_affinity().unwrap();
assert_eq!(aff, expect_cores, "{}", error_msg);
let affinity = affinity::get_thread_affinity().unwrap();
assert_eq!(affinity, expect_cores, "{}", error_msg);
}
#[cfg(not(target_os = "linux"))]
fn validate_affinity(_expect_cores: &[usize], _error_msg: &str) {}
Expand All @@ -181,44 +181,35 @@ mod tests {
..Default::default()
};

let rtm = ThreadManager::new(conf).unwrap();
let r = rtm.get_native("test").unwrap();
let manager = ThreadManager::new(conf).unwrap();
let runtime = manager.get_native("test").unwrap();

let t2 = r
let thread1 = runtime
.spawn(|| {
validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
})
.unwrap();

let t = std::thread::spawn(|| {
let thread2 = std::thread::spawn(|| {
validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");

let tt = std::thread::spawn(|| {
let inner_thread = std::thread::spawn(|| {
validate_affinity(
&[4, 5, 6, 7],
"Nested thread allocation should still be 4-7",
);
});
tt.join().unwrap();
inner_thread.join().unwrap();
});
t.join().unwrap();
t2.join().unwrap();
thread1.join().unwrap();
thread2.join().unwrap();
}

#[test]
fn rayon_affinity() {
let conf = RuntimeManagerConfig {
native_configs: HashMap::from([(
"pool1".to_owned(),
NativeConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 0, max: 4 },
max_threads: 5,
priority: 0,
..Default::default()
},
)]),
rayon_configs: HashMap::from([(
"rayon1".to_owned(),
"test".to_owned(),
RayonConfig {
core_allocation: CoreAllocation::DedicatedCoreSet { min: 1, max: 4 },
worker_threads: 3,
Expand All @@ -227,38 +218,16 @@ mod tests {
},
)]),
default_core_allocation: CoreAllocation::DedicatedCoreSet { min: 4, max: 8 },
native_runtime_mapping: HashMap::from([("test".to_owned(), "pool1".to_owned())]),

rayon_runtime_mapping: HashMap::from([("test".to_owned(), "rayon1".to_owned())]),
..Default::default()
};

let rtm = ThreadManager::new(conf).unwrap();
let r = rtm.get_native("test").unwrap();
let manager = ThreadManager::new(conf).unwrap();
let rayon_runtime = manager.get_rayon("test").unwrap();

let t2 = r
.spawn(|| {
validate_affinity(&[0, 1, 2, 3], "Managed thread allocation should be 0-3");
})
.unwrap();
let rrt = rtm.get_rayon("test").unwrap();

let t = std::thread::spawn(|| {
validate_affinity(&[4, 5, 6, 7], "Default thread allocation should be 4-7");

let tt = std::thread::spawn(|| {
validate_affinity(
&[4, 5, 6, 7],
"Nested thread allocation should still be 4-7",
);
});
tt.join().unwrap();
});
let _rr = rrt.rayon_pool.broadcast(|ctx| {
let _rr = rayon_runtime.rayon_pool.broadcast(|ctx| {
println!("Rayon thread {} reporting", ctx.index());
validate_affinity(&[1, 2, 3], "Rayon thread allocation should still be 1-3");
});
t.join().unwrap();
t2.join().unwrap();
}
}
3 changes: 2 additions & 1 deletion thread-manager/src/native_thread_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
crate::policy::{apply_policy, CoreAllocation},
anyhow::bail,
log::error,
serde::{Deserialize, Serialize},
solana_metrics::datapoint_info,
std::sync::{
Expand Down Expand Up @@ -72,7 +73,7 @@ impl<T> JoinHandle<T> {
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if self.std_handle.is_some() {
println!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your managed threads!");
error!("Attempting to drop a Join Handle of a running thread will leak thread IDs, please join your managed threads!");
self.join_inner().expect("Child thread panicked");
}
}
Expand Down

0 comments on commit df8b9d2

Please sign in to comment.