Skip to content

Commit

Permalink
ignore: Avoid contention on num_pending
Browse files Browse the repository at this point in the history
Previously, every worker would increment the shared num_pending count on
every new work item, and decrement it after finishing them, leading to
lots of contention.  Now, we only track the number of workers actively
running, so there is no contention except when workers go to sleep or
wake up.

Closes #2642
  • Loading branch information
tavianator authored and BurntSushi committed Nov 21, 2023
1 parent af55fc2 commit 6d7550d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Performance improvements:
Make most searches with `\b` look-arounds (among others) much faster.
* [PERF #2591](https://github.com/BurntSushi/ripgrep/pull/2591):
Parallel directory traversal now uses work stealing for faster searches.
* [PERF #2642](https://github.com/BurntSushi/ripgrep/pull/2642):
Parallel directory traversal has some contention reduced.

Feature enhancements:

Expand Down
45 changes: 20 additions & 25 deletions crates/ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let num_pending = Arc::new(AtomicUsize::new(stack.len()));
let active_workers = Arc::new(AtomicUsize::new(threads));
let stacks = Stack::new_for_each_thread(threads, stack);
std::thread::scope(|s| {
let handles: Vec<_> = stacks
Expand All @@ -1288,7 +1288,7 @@ impl WalkParallel {
visitor: builder.build(),
stack,
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
active_workers: active_workers.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
Expand Down Expand Up @@ -1471,8 +1471,8 @@ struct Worker<'s> {
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
/// The number of outstanding work items.
num_pending: Arc<AtomicUsize>,
/// The number of currently active workers.
active_workers: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
Expand Down Expand Up @@ -1500,7 +1500,6 @@ impl<'s> Worker<'s> {
if let WalkState::Quit = self.run_one(work) {
self.quit_now();
}
self.work_done();
}
}

Expand Down Expand Up @@ -1682,23 +1681,20 @@ impl<'s> Worker<'s> {
return None;
}
None => {
// Once num_pending reaches 0, it is impossible for it to
// ever increase again. Namely, it only reaches 0 once
// all jobs have run such that no jobs have produced more
// work. We have this guarantee because num_pending is
// always incremented before each job is submitted and only
// decremented once each job is completely finished.
// Therefore, if this reaches zero, then there can be no
// other job running.
if self.num_pending() == 0 {
// Every other thread is blocked at the next recv().
// Send the initial quit message and quit.
if self.deactivate_worker() == 0 {
// If deactivate_worker() returns 0, every worker thread
// is currently within the critical section between the
// acquire in deactivate_worker() and the release in
// activate_worker() below. For this to happen, every
// worker's local deque must be simultaneously empty,
// meaning there is no more work left at all.
self.send_quit();
return None;
}
// Wait for next `Work` or `Quit` message.
loop {
if let Some(v) = self.recv() {
self.activate_worker();
value = Some(v);
break;
}
Expand All @@ -1724,14 +1720,8 @@ impl<'s> Worker<'s> {
self.quit_now.load(AtomicOrdering::SeqCst)
}

/// Returns the number of pending jobs.
fn num_pending(&self) -> usize {
self.num_pending.load(AtomicOrdering::SeqCst)
}

/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, AtomicOrdering::SeqCst);
self.stack.push(Message::Work(work));
}

Expand All @@ -1745,9 +1735,14 @@ impl<'s> Worker<'s> {
self.stack.pop()
}

/// Signal that work has been finished.
fn work_done(&self) {
self.num_pending.fetch_sub(1, AtomicOrdering::SeqCst);
/// Deactivates a worker and returns the number of currently active workers.
fn deactivate_worker(&self) -> usize {
self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1
}

/// Reactivates a worker.
fn activate_worker(&self) {
self.active_workers.fetch_add(1, AtomicOrdering::Release);
}
}

Expand Down

0 comments on commit 6d7550d

Please sign in to comment.