Skip to content

Commit

Permalink
Reuse threads across multi-threaded benchmarks
Browse files Browse the repository at this point in the history
Once Divan spawns threads, it will keep them around for later benchmarks
to reuse. The result is that when running Divan benchmarks under a
sampling profiler, the profiler's output will be cleaner and easier to
understand.

Fixes #37.

Closes #44.
  • Loading branch information
nvzqz committed Nov 25, 2024
1 parent 364e41b commit 4c94258
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 61 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Versioning](http://semver.org/spec/v2.0.0.html).

### Added

- Thread pool for reusing threads across multi-threaded benchmarks. The result
is that when running Divan benchmarks under a sampling profiler, the
profiler's output will be cleaner and easier to understand.

- Track the maximum number of allocations during a benchmark.

### Changed
Expand Down
85 changes: 26 additions & 59 deletions src/bench/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
mem::{self, MaybeUninit},
num::NonZeroUsize,
sync::Barrier,
thread,
};

use crate::{
Expand All @@ -18,6 +17,7 @@ use crate::{
},
divan::SharedContext,
stats::{RawSample, SampleCollection, Stats, StatsSet, TimeSample},
thread_pool::BENCH_POOL,
time::{FineDuration, Timestamp, UntaggedTimestamp},
util::{self, sync::SyncWrap, Unit},
};
Expand Down Expand Up @@ -607,7 +607,6 @@ impl<'a> BenchContext<'a> {
let is_test = current_mode.is_test();

let record_sample = self.sample_recorder(gen_input, benched, drop_input);
let mut defer_store = DeferStore::default();

let thread_count = self.thread_count.get();
let aux_thread_count = thread_count - 1;
Expand All @@ -617,7 +616,7 @@ impl<'a> BenchContext<'a> {
// Per-thread sample info returned by `record_sample`. These are
// processed locally to emit user-facing sample info. As a result, this
// only contains `thread_count` many elements at a time.
let mut raw_samples = Vec::<RawSample>::new();
let mut raw_samples = Vec::<Option<RawSample>>::new();

// The time spent benchmarking, in picoseconds.
//
Expand Down Expand Up @@ -678,7 +677,7 @@ impl<'a> BenchContext<'a> {
let barrier = if is_single_thread { None } else { Some(Barrier::new(thread_count)) };

// Sample loop helper:
let record_sample = |defer_store: &mut DeferStore<I, O>| -> RawSample {
let record_sample = || -> RawSample {
let mut counter_totals: [u128; KnownCounterKind::COUNT] =
[0; KnownCounterKind::COUNT];

Expand All @@ -697,57 +696,31 @@ impl<'a> BenchContext<'a> {
};

// Sample loop:
let ([start, end], alloc_info) = record_sample(
sample_size as usize,
barrier.as_ref(),
defer_store,
&mut count_input,
);
let ([start, end], alloc_info) =
record_sample(sample_size as usize, barrier.as_ref(), &mut count_input);

RawSample { start, end, timer, alloc_info, counter_totals }
};

// Sample loop:
raw_samples.clear();
if is_single_thread {
let sample = record_sample(&mut defer_store);
if !is_test {
raw_samples.push(sample);
BENCH_POOL.par_extend(&mut raw_samples, aux_thread_count, |_| record_sample());

// Convert `&[Option<RawSample>]` to `&[Sample]`.
let raw_samples: &[RawSample] = {
if let Some(thread) = raw_samples
.iter()
.enumerate()
.find_map(|(thread, sample)| sample.is_none().then_some(thread))
{
panic!("Divan benchmarking thread {thread} panicked");
}
} else {
// TODO: Reuse auxiliary threads across samples.
thread::scope(|scope| {
let thread_handles: Vec<_> = (0..aux_thread_count)
.map(|_| scope.spawn(|| record_sample(&mut DeferStore::default())))
.collect();

let local_sample = record_sample(&mut defer_store);

if !is_test {
raw_samples.extend(
thread_handles
.into_iter()
.map(|handle| {
// Propagate panics to behave the same as
// automatic joining.
handle
.join()
.unwrap_or_else(|error| std::panic::resume_unwind(error))
})
.chain(Some(local_sample)),
);
}
});
}

#[cfg(test)]
if is_test {
// '--test' should run the expected number of times but not
// allocate any samples.
assert_eq!(raw_samples.capacity(), 0);
} else {
assert_eq!(raw_samples.len(), thread_count);
}
unsafe {
assert_eq!(size_of::<RawSample>(), size_of::<Option<RawSample>>());
std::slice::from_raw_parts(raw_samples.as_ptr().cast(), raw_samples.len())
}
};

// If testing, exit the benchmarking loop immediately after timing a
// single run.
Expand Down Expand Up @@ -794,7 +767,7 @@ impl<'a> BenchContext<'a> {
.clamp_to(timer_precision)
};

for raw_sample in &raw_samples {
for raw_sample in raw_samples {
let sample_index = self.samples.time_samples.len();

self.samples
Expand Down Expand Up @@ -849,12 +822,8 @@ impl<'a> BenchContext<'a> {
gen_input: impl Fn() -> I,
benched: impl Fn(&UnsafeCell<MaybeUninit<I>>) -> O,
drop_input: impl Fn(&UnsafeCell<MaybeUninit<I>>),
) -> impl Fn(
usize,
Option<&Barrier>,
&mut DeferStore<I, O>,
&mut dyn FnMut(&I),
) -> ([Timestamp; 2], ThreadAllocInfo) {
) -> impl Fn(usize, Option<&Barrier>, &mut dyn FnMut(&I)) -> ([Timestamp; 2], ThreadAllocInfo)
{
// We defer:
// - Usage of `gen_input` values.
// - Drop destructor for `O`, preventing it from affecting sample
Expand All @@ -864,12 +833,10 @@ impl<'a> BenchContext<'a> {

let timer_kind = self.shared_context.timer.kind();

move |sample_size: usize,
barrier: Option<&Barrier>,
defer_store: &mut DeferStore<I, O>,
count_input: &mut dyn FnMut(&I)| {
let mut saved_alloc_info = ThreadAllocInfo::new();
move |sample_size: usize, barrier: Option<&Barrier>, count_input: &mut dyn FnMut(&I)| {
let mut defer_store = DeferStore::<I, O>::default();

let mut saved_alloc_info = ThreadAllocInfo::new();
let mut save_alloc_info = || {
if crate::alloc::IGNORE_ALLOC.get() {
return;
Expand Down
5 changes: 5 additions & 0 deletions src/bench/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{
sync::atomic::{AtomicUsize, Ordering::SeqCst},
};

use util::defer;

use super::*;
use crate::{
config::Action,
Expand Down Expand Up @@ -33,6 +35,9 @@ const THREAD_COUNTS: &[usize] = if cfg!(miri) {

#[track_caller]
fn test_bencher(test: &mut dyn FnMut(Bencher)) {
// Silence Miri about leaking threads.
let _drop_threads = defer(|| BENCH_POOL.drop_threads());

let bench_options = BenchOptions {
sample_count: Some(SAMPLE_COUNT),
sample_size: Some(SAMPLE_SIZE),
Expand Down
6 changes: 5 additions & 1 deletion src/divan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ use crate::{
PrivBytesFormat,
},
entry::{AnyBenchEntry, BenchEntryRunner, EntryTree},
thread_pool::BENCH_POOL,
time::{Timer, TimerKind},
tree_painter::{TreeColumn, TreePainter},
util, Bencher,
util::{self, defer},
Bencher,
};

/// The benchmark runner.
Expand Down Expand Up @@ -94,6 +96,8 @@ impl Divan {
}

pub(crate) fn run_action(&self, action: Action) {
let _drop_threads = defer(|| BENCH_POOL.drop_threads());

let mut tree: Vec<EntryTree> = if cfg!(miri) {
// Miri does not work with our linker tricks.
Vec::new()
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod config;
mod divan;
mod entry;
mod stats;
mod thread_pool;
mod time;
mod tree_painter;
mod util;
Expand Down
Loading

0 comments on commit 4c94258

Please sign in to comment.