Skip to content

Commit

Permalink
Reuse benchmark threads across runs
Browse files Browse the repository at this point in the history
Instead of using a `thread::scope`, this implements a similar concept, but reusing a global list of threads.
  • Loading branch information
Swatinem authored and nvzqz committed Feb 11, 2024
1 parent 1c182e9 commit a0f36c8
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 26 deletions.
35 changes: 11 additions & 24 deletions src/bench/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
mem::{self, MaybeUninit},
num::NonZeroUsize,
sync::Barrier,
thread,
};

use crate::{
Expand Down Expand Up @@ -714,29 +713,17 @@ impl<'a> BenchContext<'a> {
raw_samples.push(sample);
}
} else {
// TODO: Reuse auxiliary threads across samples.
thread::scope(|scope| {
let thread_handles: Vec<_> = (0..aux_thread_count)
.map(|_| scope.spawn(|| record_sample(&mut DeferStore::default())))
.collect();

let local_sample = record_sample(&mut defer_store);

if !is_test {
raw_samples.extend(
thread_handles
.into_iter()
.map(|handle| {
// Propagate panics to behave the same as
// automatic joining.
handle
.join()
.unwrap_or_else(|error| std::panic::resume_unwind(error))
})
.chain(Some(local_sample)),
);
}
});
self.shared_context.auxiliary_threads.with_threads(
aux_thread_count,
|| record_sample(&mut DeferStore::default()),
|threaded_samples| {
let local_sample = record_sample(&mut defer_store);

if !is_test {
raw_samples.extend(threaded_samples.chain(Some(local_sample)));
}
},
);
}

#[cfg(test)]
Expand Down
8 changes: 6 additions & 2 deletions src/bench/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ fn test_bencher(test: &mut dyn FnMut(Bencher)) {

for timer in Timer::available() {
for action in [Action::Bench, Action::Test] {
let shared_context =
SharedContext { action, timer, bench_overhead: FineDuration::default() };
let shared_context = SharedContext {
action,
timer,
auxiliary_threads: Default::default(),
bench_overhead: FineDuration::default(),
};

for &thread_count in THREAD_COUNTS {
let mut bench_context = BenchContext::new(
Expand Down
5 changes: 5 additions & 0 deletions src/divan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
BytesCount, BytesFormat, CharsCount, IntoCounter, ItemsCount, MaxCountUInt, PrivBytesFormat,
},
entry::{AnyBenchEntry, BenchEntryRunner, EntryTree},
threads::AuxiliaryThreads,
time::{FineDuration, Timer, TimerKind},
tree_painter::{TreeColumn, TreePainter},
util, Bencher,
Expand Down Expand Up @@ -40,6 +41,9 @@ pub(crate) struct SharedContext {
/// The timer used to measure samples.
pub timer: Timer,

/// Auxiliary benchmark threads that will be reused across runs.
pub auxiliary_threads: AuxiliaryThreads,

/// Per-iteration overhead.
///
/// `min_time` and `max_time` do not consider this as benchmarking time.
Expand Down Expand Up @@ -155,6 +159,7 @@ impl Divan {
let shared_context = SharedContext {
action,
timer,
auxiliary_threads: Default::default(),
bench_overhead: if action.is_bench() {
timer.measure_sample_loop_overhead()
} else {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod config;
mod divan;
mod entry;
mod stats;
mod threads;
mod time;
mod tree_painter;
mod util;
Expand Down
99 changes: 99 additions & 0 deletions src/threads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::cell::RefCell;
use std::slice::IterMut;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};

use crate::stats::RawSample;

#[derive(Default)]
pub(crate) struct AuxiliaryThreads {
inner: RefCell<Vec<BencherThread>>,
}

impl AuxiliaryThreads {
pub fn with_threads<ThreadFn, Scope, R>(
&self,
thread_count: usize,
task_fn: ThreadFn,
scope: Scope,
) -> R
where
ThreadFn: Fn() -> RawSample,
Scope: FnOnce(Results) -> R,
{
let mut threads = self.inner.borrow_mut();
if threads.len() < thread_count {
threads.resize_with(thread_count, BencherThread::new)
}
let threads = &mut threads[..thread_count];

// SAFETY: We wait for all child task to finish executing their copy of the
// `task_fn` within `Results::drop`. Therefore we are not holding on to the `task_fn`
// after returning from this function / scope.
let dyn_task_fn: &dyn Fn() -> RawSample = &task_fn;
let dyn_task_fn: Task = unsafe { std::mem::transmute(dyn_task_fn) };

for thread in threads.iter() {
thread.send_workload.as_ref().unwrap().send(dyn_task_fn).unwrap();
}

scope(Results { threads: threads.iter_mut() })
}
}

pub(crate) struct Results<'t> {
threads: IterMut<'t, BencherThread>,
}

impl Iterator for Results<'_> {
type Item = RawSample;

fn next(&mut self) -> Option<Self::Item> {
let thread = self.threads.next()?;
Some(thread.expect_result())
}
}

impl Drop for Results<'_> {
fn drop(&mut self) {
for _result in self {}
}
}

type Task = &'static (dyn Fn() -> RawSample + Send + Sync);

pub struct BencherThread {
handle: Option<JoinHandle<()>>,
send_workload: Option<Sender<Task>>,
receive_result: Receiver<RawSample>,
}

impl BencherThread {
pub fn new() -> Self {
let (send_workload, receive_workload) = channel::<Task>();
let (send_result, receive_result) = channel();

let handle = thread::spawn(move || {
for task in receive_workload {
let sample = (*task)();
send_result.send(sample).unwrap();
}
});

Self { handle: Some(handle), send_workload: Some(send_workload), receive_result }
}

pub fn expect_result(&mut self) -> RawSample {
self.receive_result.recv().unwrap_or_else(|_| {
let error = self.handle.take().unwrap().join().unwrap_err();
std::panic::resume_unwind(error)
})
}
}

impl Drop for BencherThread {
fn drop(&mut self) {
drop(self.send_workload.take());
self.handle.take().unwrap().join().unwrap()
}
}

0 comments on commit a0f36c8

Please sign in to comment.