Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse benchmark threads across runs #44

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 11 additions & 24 deletions src/bench/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
mem::{self, MaybeUninit},
num::NonZeroUsize,
sync::Barrier,
thread,
};

use crate::{
Expand Down Expand Up @@ -714,29 +713,17 @@ impl<'a> BenchContext<'a> {
raw_samples.push(sample);
}
} else {
// TODO: Reuse auxiliary threads across samples.
thread::scope(|scope| {
let thread_handles: Vec<_> = (0..aux_thread_count)
.map(|_| scope.spawn(|| record_sample(&mut DeferStore::default())))
.collect();

let local_sample = record_sample(&mut defer_store);

if !is_test {
raw_samples.extend(
thread_handles
.into_iter()
.map(|handle| {
// Propagate panics to behave the same as
// automatic joining.
handle
.join()
.unwrap_or_else(|error| std::panic::resume_unwind(error))
})
.chain(Some(local_sample)),
);
}
});
self.shared_context.auxiliary_threads.with_threads(
aux_thread_count,
|| record_sample(&mut DeferStore::default()),
|threaded_samples| {
let local_sample = record_sample(&mut defer_store);

if !is_test {
raw_samples.extend(threaded_samples.chain(Some(local_sample)));
}
},
);
}

#[cfg(test)]
Expand Down
8 changes: 6 additions & 2 deletions src/bench/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ fn test_bencher(test: &mut dyn FnMut(Bencher)) {

for timer in Timer::available() {
for action in [Action::Bench, Action::Test] {
let shared_context =
SharedContext { action, timer, bench_overhead: FineDuration::default() };
let shared_context = SharedContext {
action,
timer,
auxiliary_threads: Default::default(),
bench_overhead: FineDuration::default(),
};

for &thread_count in THREAD_COUNTS {
let mut bench_context = BenchContext::new(
Expand Down
5 changes: 5 additions & 0 deletions src/divan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
BytesCount, BytesFormat, CharsCount, IntoCounter, ItemsCount, MaxCountUInt, PrivBytesFormat,
},
entry::{AnyBenchEntry, BenchEntryRunner, EntryTree},
threads::AuxiliaryThreads,
time::{FineDuration, Timer, TimerKind},
tree_painter::{TreeColumn, TreePainter},
util, Bencher,
Expand Down Expand Up @@ -40,6 +41,9 @@ pub(crate) struct SharedContext {
/// The timer used to measure samples.
pub timer: Timer,

/// Auxiliary benchmark threads that will be reused across runs.
pub auxiliary_threads: AuxiliaryThreads,

/// Per-iteration overhead.
///
/// `min_time` and `max_time` do not consider this as benchmarking time.
Expand Down Expand Up @@ -155,6 +159,7 @@ impl Divan {
let shared_context = SharedContext {
action,
timer,
auxiliary_threads: Default::default(),
bench_overhead: if action.is_bench() {
timer.measure_sample_loop_overhead()
} else {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod config;
mod divan;
mod entry;
mod stats;
mod threads;
mod time;
mod tree_painter;
mod util;
Expand Down
99 changes: 99 additions & 0 deletions src/threads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::cell::RefCell;
use std::slice::IterMut;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};

use crate::stats::RawSample;

#[derive(Default)]
pub(crate) struct AuxiliaryThreads {
inner: RefCell<Vec<BencherThread>>,
}

impl AuxiliaryThreads {
pub fn with_threads<ThreadFn, Scope, R>(
&self,
thread_count: usize,
task_fn: ThreadFn,
scope: Scope,
) -> R
where
ThreadFn: Fn() -> RawSample,
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
Scope: FnOnce(Results) -> R,
{
let mut threads = self.inner.borrow_mut();
if threads.len() < thread_count {
threads.resize_with(thread_count, BencherThread::new)
}
let threads = &mut threads[..thread_count];

// SAFETY: We wait for all child task to finish executing their copy of the
// `task_fn` within `Results::drop`. Therefore we are not holding on to the `task_fn`
// after returning from this function / scope.
let dyn_task_fn: &dyn Fn() -> RawSample = &task_fn;
let dyn_task_fn: Task = unsafe { std::mem::transmute(dyn_task_fn) };

for thread in threads.iter() {
thread.send_workload.as_ref().unwrap().send(dyn_task_fn).unwrap();
}

scope(Results { threads: threads.iter_mut() })
}
}

pub(crate) struct Results<'t> {
threads: IterMut<'t, BencherThread>,
}

impl Iterator for Results<'_> {
type Item = RawSample;

fn next(&mut self) -> Option<Self::Item> {
let thread = self.threads.next()?;
Some(thread.expect_result())
}
}

impl Drop for Results<'_> {
fn drop(&mut self) {
for _result in self {}
}
}
Swatinem marked this conversation as resolved.
Show resolved Hide resolved

type Task = &'static (dyn Fn() -> RawSample + Send + Sync);
Swatinem marked this conversation as resolved.
Show resolved Hide resolved

pub struct BencherThread {
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
handle: Option<JoinHandle<()>>,
send_workload: Option<Sender<Task>>,
receive_result: Receiver<RawSample>,
}

impl BencherThread {
pub fn new() -> Self {
let (send_workload, receive_workload) = channel::<Task>();
let (send_result, receive_result) = channel();

let handle = thread::spawn(move || {
for task in receive_workload {
let sample = (*task)();
send_result.send(sample).unwrap();
}
});

Self { handle: Some(handle), send_workload: Some(send_workload), receive_result }
}

pub fn expect_result(&mut self) -> RawSample {
self.receive_result.recv().unwrap_or_else(|_| {
let error = self.handle.take().unwrap().join().unwrap_err();
std::panic::resume_unwind(error)
})
}
}

impl Drop for BencherThread {
fn drop(&mut self) {
drop(self.send_workload.take());
self.handle.take().unwrap().join().unwrap()
}
}
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
Loading